Skip to content

Commit fc4c6aa

Browse files
committed
Implement hybrid CI workflow status fetching in CiHandlerImpl using ci_workflow_status_latest table with fallback to legacy query methods. Update repositories with new helper methods to support this approach.
1 parent 96815d3 commit fc4c6aa

File tree

4 files changed

+294
-71
lines changed

4 files changed

+294
-71
lines changed

internal/sql/repository/pipelineConfig/CiWorkflowRepository.go

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type CiWorkflowRepository interface {
4040
FindByName(name string) (*CiWorkflow, error)
4141

4242
FindLastTriggeredWorkflowByCiIds(pipelineId []int) (ciWorkflow []*CiWorkflow, err error)
43-
FindLastTriggeredWorkflowByCiIdsOptimized(pipelineId []int) (ciWorkflow []*CiWorkflow, err error)
43+
FindWorkflowsByCiWorkflowIds(ciWorkflowIds []int) (ciWorkflow []*CiWorkflow, err error)
4444
FindLastTriggeredWorkflowByArtifactId(ciArtifactId int) (ciWorkflow *CiWorkflow, err error)
4545
FindAllLastTriggeredWorkflowByArtifactId(ciArtifactId []int) (ciWorkflow []*CiWorkflow, err error)
4646
FindAllTriggeredWorkflowCountInLast24Hour() (ciWorkflowCount int, err error)
@@ -49,6 +49,7 @@ type CiWorkflowRepository interface {
4949
ExistsByStatus(status string) (bool, error)
5050
FindBuildTypeAndStatusDataOfLast1Day() ([]*BuildTypeCount, error)
5151
FIndCiWorkflowStatusesByAppId(appId int) ([]*CiWorkflowStatus, error)
52+
FindCiPipelineIdsByAppId(appId int) ([]int, error)
5253

5354
MigrateIsArtifactUploaded(wfId int, isArtifactUploaded bool)
5455
MigrateCiArtifactLocation(wfId int, artifactLocation string)
@@ -291,20 +292,16 @@ func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByCiIds(pipelineI
291292
return ciWorkflow, err
292293
}
293294

294-
// FindLastTriggeredWorkflowByCiIdsOptimized uses the ci_workflow_status_latest table for better performance
295-
func (impl *CiWorkflowRepositoryImpl) FindLastTriggeredWorkflowByCiIdsOptimized(pipelineId []int) (ciWorkflow []*CiWorkflow, err error) {
295+
// FindWorkflowsByCiWorkflowIds fetches workflows by their workflow IDs (simple query)
296+
func (impl *CiWorkflowRepositoryImpl) FindWorkflowsByCiWorkflowIds(ciWorkflowIds []int) (ciWorkflow []*CiWorkflow, err error) {
297+
if len(ciWorkflowIds) == 0 {
298+
return ciWorkflow, nil
299+
}
300+
296301
err = impl.dbConnection.Model(&ciWorkflow).
297302
Column("ci_workflow.*", "CiPipeline").
298-
Join("INNER JOIN ci_workflow_status_latest cwsl ON cwsl.ci_workflow_id = ci_workflow.id").
299-
Where("cwsl.pipeline_id IN (?)", pg.In(pipelineId)).
303+
Where("ci_workflow.id IN (?)", pg.In(ciWorkflowIds)).
300304
Select()
301-
302-
if err != nil {
303-
impl.logger.Errorw("error in optimized query, falling back to old method", "err", err, "pipelineIds", pipelineId)
304-
// Fallback to the old method if optimized query fails
305-
return impl.FindLastTriggeredWorkflowByCiIds(pipelineId)
306-
}
307-
308305
return ciWorkflow, err
309306
}
310307

@@ -397,6 +394,16 @@ func (impl *CiWorkflowRepositoryImpl) FIndCiWorkflowStatusesByAppId(appId int) (
397394
return ciworkflowStatuses, err
398395
}
399396

397+
// FindCiPipelineIdsByAppId gets all CI pipeline IDs for an app (simple query)
398+
func (impl *CiWorkflowRepositoryImpl) FindCiPipelineIdsByAppId(appId int) ([]int, error) {
399+
var ciPipelineIds []int
400+
err := impl.dbConnection.Model((*CiPipeline)(nil)).
401+
Column("id").
402+
Where("app_id = ? AND deleted = false", appId).
403+
Select(&ciPipelineIds)
404+
return ciPipelineIds, err
405+
}
406+
400407
func (impl *CiWorkflowRepositoryImpl) MigrateIsArtifactUploaded(wfId int, isArtifactUploaded bool) {
401408
_, err := impl.dbConnection.Model((*CiWorkflow)(nil)).
402409
Set("is_artifact_uploaded = ?", workflow.GetArtifactUploadedType(isArtifactUploaded)).

internal/sql/repository/pipelineConfig/WorkflowStatusLatestRepository.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type WorkflowStatusLatestRepository interface {
2828
UpdateCiWorkflowStatusLatest(model *CiWorkflowStatusLatest) error
2929
GetCiWorkflowStatusLatestByPipelineId(pipelineId int) (*CiWorkflowStatusLatest, error)
3030
GetCiWorkflowStatusLatestByAppId(appId int) ([]*CiWorkflowStatusLatest, error)
31+
GetCachedPipelineIds(pipelineIds []int) ([]int, error)
32+
GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error)
3133
DeleteCiWorkflowStatusLatestByPipelineId(pipelineId int) error
3234

3335
// CD Workflow Status Latest methods
@@ -127,6 +129,39 @@ func (impl *WorkflowStatusLatestRepositoryImpl) DeleteCiWorkflowStatusLatestByPi
127129
return nil
128130
}
129131

132+
func (impl *WorkflowStatusLatestRepositoryImpl) GetCachedPipelineIds(pipelineIds []int) ([]int, error) {
133+
if len(pipelineIds) == 0 {
134+
return []int{}, nil
135+
}
136+
137+
var cachedPipelineIds []int
138+
err := impl.dbConnection.Model(&CiWorkflowStatusLatest{}).
139+
Column("pipeline_id").
140+
Where("pipeline_id IN (?)", pg.In(pipelineIds)).
141+
Select(&cachedPipelineIds)
142+
if err != nil {
143+
impl.logger.Errorw("error in getting cached pipeline ids", "err", err, "pipelineIds", pipelineIds)
144+
return nil, err
145+
}
146+
return cachedPipelineIds, nil
147+
}
148+
149+
func (impl *WorkflowStatusLatestRepositoryImpl) GetCiWorkflowStatusLatestByPipelineIds(pipelineIds []int) ([]*CiWorkflowStatusLatest, error) {
150+
if len(pipelineIds) == 0 {
151+
return []*CiWorkflowStatusLatest{}, nil
152+
}
153+
154+
var models []*CiWorkflowStatusLatest
155+
err := impl.dbConnection.Model(&models).
156+
Where("pipeline_id IN (?)", pg.In(pipelineIds)).
157+
Select()
158+
if err != nil {
159+
impl.logger.Errorw("error in getting ci workflow status latest by pipeline ids", "err", err, "pipelineIds", pipelineIds)
160+
return nil, err
161+
}
162+
return models, nil
163+
}
164+
130165
// CD Workflow Status Latest methods implementation
131166
func (impl *WorkflowStatusLatestRepositoryImpl) SaveCdWorkflowStatusLatest(model *CdWorkflowStatusLatest) error {
132167
err := impl.dbConnection.Insert(model)

0 commit comments

Comments
 (0)