Skip to content

Commit b86f98f

Browse files
committed
Update WorkflowStatusLatestService to handle both create and update for CI/CD workflow statuses, refactor repository methods, and modify unique constraints in workflow status tables.
1 parent ee5672e commit b86f98f

File tree

3 files changed

+124
-25
lines changed

3 files changed

+124
-25
lines changed

internal/sql/repository/pipelineConfig/WorkflowStatusLatestRepository.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,14 @@ import (
2626
type WorkflowStatusLatestRepository interface {
2727
// CI Workflow Status Latest methods
2828
SaveCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error
29+
UpdateCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error
30+
GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error)
2931
GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error)
3032

3133
// CD Workflow Status Latest methods
3234
SaveCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error
35+
UpdateCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error
36+
GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx *pg.Tx, pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error)
3337
GetCdWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error)
3438
}
3539

@@ -99,6 +103,33 @@ func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipel
99103
return models, nil
100104
}
101105

106+
func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCiWorkflowStatusLatest(tx *pg.Tx, model *CiWorkflowStatusLatest) error {
107+
var connection orm.DB
108+
if tx != nil {
109+
connection = tx
110+
} else {
111+
connection = impl.dbConnection
112+
}
113+
_, err := connection.Model(model).WherePK().Update()
114+
if err != nil {
115+
impl.logger.Errorw("error in updating ci workflow status latest", "err", err, "model", model)
116+
return err
117+
}
118+
return nil
119+
}
120+
121+
func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error) {
122+
var model CiWorkflowStatusLatest
123+
err := impl.dbConnection.Model(&model).
124+
Where("pipeline_id = ?", pipelineId).
125+
Select()
126+
if err != nil {
127+
impl.logger.Errorw("error in getting ci workflow status latest by pipeline id", "err", err, "pipelineId", pipelineId)
128+
return nil, err
129+
}
130+
return &model, nil
131+
}
132+
102133
// CD Workflow Status Latest methods implementation
103134
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error {
104135
var connection orm.DB
@@ -126,3 +157,37 @@ func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipel
126157
}
127158
return models, nil
128159
}
160+
161+
func (impl *WorkflowStatusLatestRepositoryImpl) UpdateCdWorkflowStatusLatest(tx *pg.Tx, model *CdWorkflowStatusLatest) error {
162+
var connection orm.DB
163+
if tx != nil {
164+
connection = tx
165+
} else {
166+
connection = impl.dbConnection
167+
}
168+
_, err := connection.Model(model).WherePK().Update()
169+
if err != nil {
170+
impl.logger.Errorw("error in updating cd workflow status latest", "err", err, "model", model)
171+
return err
172+
}
173+
return nil
174+
}
175+
176+
func (impl *WorkflowStatusLatestRepositoryImpl) GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx *pg.Tx, pipelineId int, workflowType string) (*CdWorkflowStatusLatest, error) {
177+
var connection orm.DB
178+
if tx != nil {
179+
connection = tx
180+
} else {
181+
connection = impl.dbConnection
182+
}
183+
184+
var model CdWorkflowStatusLatest
185+
err := connection.Model(&model).
186+
Where("pipeline_id = ? AND workflow_type = ?", pipelineId, workflowType).
187+
Select()
188+
if err != nil {
189+
impl.logger.Errorw("error in getting cd workflow status latest by pipeline id and workflow type", "err", err, "pipelineId", pipelineId, "workflowType", workflowType)
190+
return nil, err
191+
}
192+
return &model, nil
193+
}

pkg/workflow/workflowStatusLatest/WorkflowStatusLatestService.go

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package workflowStatusLatest
1818

1919
import (
2020
"fmt"
21+
util2 "github.com/devtron-labs/devtron/internal/util"
2122
"time"
2223

2324
"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
@@ -83,18 +84,35 @@ func (impl *WorkflowStatusLatestServiceImpl) SaveCiWorkflowStatusLatest(tx *pg.T
8384
return fmt.Errorf("invalid ciWorkflowId: %d", ciWorkflowId)
8485
}
8586

86-
now := time.Now()
87-
model := &pipelineConfig.CiWorkflowStatusLatest{
88-
PipelineId: pipelineId,
89-
AppId: appId,
90-
CiWorkflowId: ciWorkflowId,
87+
// Check if entry exists
88+
existingEntry, err := impl.workflowStatusLatestRepository.GetCiWorkflowStatusLatestByPipelineId(pipelineId)
89+
if err != nil && !util2.IsErrNoRows(err) {
90+
impl.logger.Errorw("error in getting ci workflow status latest", "err", err, "pipelineId", pipelineId)
91+
return err
9192
}
92-
model.CreatedBy = userId
93-
model.CreatedOn = now
94-
model.UpdatedBy = userId
95-
model.UpdatedOn = now
9693

97-
return impl.workflowStatusLatestRepository.SaveCiWorkflowStatusLatest(tx, model)
94+
now := time.Now()
95+
if util2.IsErrNoRows(err) {
96+
// Create new entry
97+
model := &pipelineConfig.CiWorkflowStatusLatest{
98+
PipelineId: pipelineId,
99+
AppId: appId,
100+
CiWorkflowId: ciWorkflowId,
101+
}
102+
model.CreatedBy = userId
103+
model.CreatedOn = now
104+
model.UpdatedBy = userId
105+
model.UpdatedOn = now
106+
107+
return impl.workflowStatusLatestRepository.SaveCiWorkflowStatusLatest(tx, model)
108+
} else {
109+
// Update existing entry with latest workflow ID
110+
existingEntry.CiWorkflowId = ciWorkflowId
111+
existingEntry.UpdatedBy = userId
112+
existingEntry.UpdatedOn = now
113+
114+
return impl.workflowStatusLatestRepository.UpdateCiWorkflowStatusLatest(tx, existingEntry)
115+
}
98116
}
99117

100118
func (impl *WorkflowStatusLatestServiceImpl) GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*pipelineConfig.CiWorkflowStatusLatest, error) {
@@ -109,21 +127,37 @@ func (impl *WorkflowStatusLatestServiceImpl) SaveCdWorkflowStatusLatest(tx *pg.T
109127
return fmt.Errorf("invalid workflowRunnerId: %d", workflowRunnerId)
110128
}
111129

112-
// Create new entry (always save, don't update)
113-
now := time.Now()
114-
model := &pipelineConfig.CdWorkflowStatusLatest{
115-
PipelineId: pipelineId,
116-
AppId: appId,
117-
EnvironmentId: environmentId,
118-
WorkflowType: workflowType,
119-
WorkflowRunnerId: workflowRunnerId,
130+
// Check if entry exists
131+
existingEntry, err := impl.workflowStatusLatestRepository.GetCdWorkflowStatusLatestByPipelineIdAndWorkflowType(tx, pipelineId, workflowType)
132+
if err != nil && err != pg.ErrNoRows {
133+
impl.logger.Errorw("error in getting cd workflow status latest", "err", err, "pipelineId", pipelineId, "workflowType", workflowType)
134+
return err
120135
}
121-
model.CreatedBy = userId
122-
model.CreatedOn = now
123-
model.UpdatedBy = userId
124-
model.UpdatedOn = now
125136

126-
return impl.workflowStatusLatestRepository.SaveCdWorkflowStatusLatest(tx, model)
137+
now := time.Now()
138+
if err == pg.ErrNoRows {
139+
// Create new entry
140+
model := &pipelineConfig.CdWorkflowStatusLatest{
141+
PipelineId: pipelineId,
142+
AppId: appId,
143+
EnvironmentId: environmentId,
144+
WorkflowType: workflowType,
145+
WorkflowRunnerId: workflowRunnerId,
146+
}
147+
model.CreatedBy = userId
148+
model.CreatedOn = now
149+
model.UpdatedBy = userId
150+
model.UpdatedOn = now
151+
152+
return impl.workflowStatusLatestRepository.SaveCdWorkflowStatusLatest(tx, model)
153+
} else {
154+
// Update existing entry with latest workflow runner ID
155+
existingEntry.WorkflowRunnerId = workflowRunnerId
156+
existingEntry.UpdatedBy = userId
157+
existingEntry.UpdatedOn = now
158+
159+
return impl.workflowStatusLatestRepository.UpdateCdWorkflowStatusLatest(tx, existingEntry)
160+
}
127161
}
128162

129163
func (impl *WorkflowStatusLatestServiceImpl) GetCdWorkflowLatestByPipelineIds(pipelineIds []int) ([]*CdWorkflowStatusLatest, error) {

scripts/sql/34203900_workflow_status_latest_tables.up.sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ CREATE TABLE IF NOT EXISTS "public"."ci_workflow_status_latest" (
1414
"updated_on" timestamptz NOT NULL,
1515
"updated_by" int4 NOT NULL,
1616
PRIMARY KEY ("id"),
17-
UNIQUE ("ci_workflow_id")
17+
UNIQUE ("pipeline_id")
1818
);
1919

2020
-- Create Sequence for cd_workflow_status_latest
@@ -33,7 +33,7 @@ CREATE TABLE IF NOT EXISTS "public"."cd_workflow_status_latest" (
3333
"updated_on" timestamptz NOT NULL,
3434
"updated_by" int4 NOT NULL,
3535
PRIMARY KEY ("id"),
36-
UNIQUE ("workflow_runner_id", "workflow_type")
36+
UNIQUE ("pipeline_id", "workflow_type")
3737
);
3838

3939
COMMIT;

0 commit comments

Comments
 (0)