@@ -27,6 +27,7 @@ import (
27
27
buildBean "github.com/devtron-labs/devtron/pkg/build/pipeline/bean"
28
28
repository2 "github.com/devtron-labs/devtron/pkg/cluster/environment/repository"
29
29
eventProcessorBean "github.com/devtron-labs/devtron/pkg/eventProcessor/bean"
30
+ "github.com/devtron-labs/devtron/pkg/pipeline/adapter"
30
31
"github.com/devtron-labs/devtron/pkg/pipeline/constants"
31
32
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus"
32
33
"regexp"
@@ -647,100 +648,79 @@ func (impl *CiHandlerImpl) stateChanged(status string, podStatus string, msg str
647
648
}
648
649
649
650
func (impl * CiHandlerImpl ) FetchCiStatusForTriggerViewV1 (appId int ) ([]* pipelineConfig.CiWorkflowStatus , error ) {
650
- // Get all CI pipeline IDs for this app
651
651
allPipelineIds , err := impl .ciWorkflowRepository .FindCiPipelineIdsByAppId (appId )
652
652
if err != nil {
653
- impl .Logger .Errorw ("error in getting ci pipeline ids for app, falling back to old method" , "err " , err , "appId " , appId )
653
+ impl .Logger .Errorw ("error in getting ci pipeline ids for app, falling back to old method" , "appId " , appId , "err " , err )
654
654
return impl .ciWorkflowRepository .FIndCiWorkflowStatusesByAppId (appId )
655
655
}
656
656
657
657
if len (allPipelineIds ) == 0 {
658
658
return []* pipelineConfig.CiWorkflowStatus {}, nil
659
659
}
660
660
661
- // Find which pipeline IDs have entries in latest status table
662
661
pipelinesInLatestTable , err := impl .workflowStatusLatestRepository .GetByPipelineIds (allPipelineIds )
663
662
if err != nil {
664
- impl .Logger .Errorw ("error in checking latest status table, falling back to old method" , "err " , err , "appId " , appId )
663
+ impl .Logger .Errorw ("error in checking latest status table, falling back to old method" , "appId " , appId , "err " , err )
665
664
return impl .ciWorkflowRepository .FIndCiWorkflowStatusesByAppId (appId )
666
665
}
667
666
668
667
var allStatuses []* pipelineConfig.CiWorkflowStatus
669
668
670
- // Fetch from latest status table for available pipelines
671
669
if len (pipelinesInLatestTable ) > 0 {
672
670
statusesFromLatestTable , err := impl .fetchCiStatusFromLatestTable (pipelinesInLatestTable )
673
671
if err != nil {
674
- impl .Logger .Errorw ("error in fetching from latest status table" , "err " , err , "pipelineIds " , pipelinesInLatestTable )
672
+ impl .Logger .Errorw ("error in fetching from latest status table" , "pipelineIds " , pipelinesInLatestTable , "err " , err )
675
673
return nil , err
676
674
} else {
677
675
allStatuses = append (allStatuses , statusesFromLatestTable ... )
678
676
}
679
677
}
680
678
681
- // Find pipeline IDs that are NOT in latest status table
682
- pipelinesNotInLatestTable := impl .findMissingPipelineIds (allPipelineIds , pipelinesInLatestTable )
679
+ pipelinesNotInLatestTable := impl .getPipelineIdsNotInLatestTable (allPipelineIds , pipelinesInLatestTable )
683
680
684
- // Fetch using complex query for missing pipeline IDs
685
681
if len (pipelinesNotInLatestTable ) > 0 {
686
- statusesFromComplexQuery , err := impl .fetchCiStatusUsingComplexQuery (pipelinesNotInLatestTable )
682
+ statusesFromComplexQuery , err := impl .fetchCiStatusUsingFallbackMethod (pipelinesNotInLatestTable )
687
683
if err != nil {
688
- impl .Logger .Errorw ("error in fetching using complex query" , "err " , err , "pipelineIds " , pipelinesNotInLatestTable )
684
+ impl .Logger .Errorw ("error in fetching using complex query" , "pipelineIds " , pipelinesNotInLatestTable , "err " , err )
689
685
return nil , err
690
686
} else {
691
687
allStatuses = append (allStatuses , statusesFromComplexQuery ... )
692
688
}
693
689
}
694
690
695
- impl .Logger .Debugw ("hybrid ci status fetch completed" ,
696
- "appId" , appId ,
697
- "totalPipelines" , len (allPipelineIds ),
698
- "pipelinesFromLatestTable" , len (pipelinesInLatestTable ),
699
- "pipelinesFromComplexQuery" , len (pipelinesNotInLatestTable ),
700
- "totalResults" , len (allStatuses ))
691
+ impl .Logger .Debugw ("hybrid ci status fetch completed" , "appId" , appId , "totalPipelines" , len (allPipelineIds ), "pipelinesFromLatestTable" , len (pipelinesInLatestTable ), "pipelinesFromOldQuery" , len (pipelinesNotInLatestTable ))
701
692
702
693
return allStatuses , nil
703
694
}
704
695
705
696
// fetchCiStatusFromLatestTable fetches CI status from ci_workflow_status_latest table
706
697
func (impl * CiHandlerImpl ) fetchCiStatusFromLatestTable (pipelineIds []int ) ([]* pipelineConfig.CiWorkflowStatus , error ) {
707
- // Get entries from latest status table
708
698
latestStatusEntries , err := impl .workflowStatusLatestRepository .GetCiWorkflowStatusLatestByPipelineIds (pipelineIds )
709
699
if err != nil {
710
700
return nil , err
711
701
}
712
702
713
- // Extract workflow IDs
714
703
var workflowIds []int
715
704
for _ , entry := range latestStatusEntries {
716
705
workflowIds = append (workflowIds , entry .CiWorkflowId )
717
706
}
718
707
719
- // Get workflows by IDs
720
708
workflows , err := impl .ciWorkflowRepository .FindWorkflowsByCiWorkflowIds (workflowIds )
721
709
if err != nil {
722
710
return nil , err
723
711
}
724
712
725
- // Convert to CiWorkflowStatus format
726
713
var statuses []* pipelineConfig.CiWorkflowStatus
727
714
for _ , workflow := range workflows {
728
- status := & pipelineConfig.CiWorkflowStatus {
729
- CiWorkflowId : workflow .Id ,
730
- CiPipelineName : workflow .CiPipeline .Name ,
731
- CiPipelineId : workflow .CiPipelineId ,
732
- CiStatus : workflow .Status ,
733
- StorageConfigured : workflow .BlobStorageEnabled ,
734
- }
715
+ status := adapter .GetCiWorkflowStatusFromCiWorkflow (workflow )
735
716
statuses = append (statuses , status )
736
717
}
737
718
738
719
return statuses , nil
739
720
}
740
721
741
- // fetchCiStatusUsingComplexQuery fetches CI status using complex query for specific pipeline IDs
742
- func (impl * CiHandlerImpl ) fetchCiStatusUsingComplexQuery (pipelineIds []int ) ([]* pipelineConfig.CiWorkflowStatus , error ) {
743
- // Get latest workflows for these pipelines
722
+ // fetchCiStatusUsingFallbackMethod fetches CI status directly from workflow table having multiple joins
723
+ func (impl * CiHandlerImpl ) fetchCiStatusUsingFallbackMethod (pipelineIds []int ) ([]* pipelineConfig.CiWorkflowStatus , error ) {
744
724
workflows , err := impl .ciWorkflowRepository .FindLastTriggeredWorkflowByCiIds (pipelineIds )
745
725
if err != nil {
746
726
return nil , err
@@ -749,13 +729,7 @@ func (impl *CiHandlerImpl) fetchCiStatusUsingComplexQuery(pipelineIds []int) ([]
749
729
// Convert to CiWorkflowStatus format
750
730
var statuses []* pipelineConfig.CiWorkflowStatus
751
731
for _ , workflow := range workflows {
752
- status := & pipelineConfig.CiWorkflowStatus {
753
- CiWorkflowId : workflow .Id ,
754
- CiPipelineName : workflow .CiPipeline .Name ,
755
- CiPipelineId : workflow .CiPipelineId ,
756
- CiStatus : workflow .Status ,
757
- StorageConfigured : workflow .BlobStorageEnabled ,
758
- }
732
+ status := adapter .GetCiWorkflowStatusFromCiWorkflow (workflow )
759
733
statuses = append (statuses , status )
760
734
}
761
735
@@ -787,50 +761,43 @@ func (impl *CiHandlerImpl) fetchLastTriggeredWorkflowsHybrid(pipelineIds []int)
787
761
return []* pipelineConfig.CiWorkflow {}, nil
788
762
}
789
763
790
- // Find which pipeline IDs have entries in latest status table
791
764
pipelinesInLatestTable , err := impl .workflowStatusLatestRepository .GetByPipelineIds (pipelineIds )
792
765
if err != nil {
793
- impl .Logger .Errorw ("error in checking latest status table, falling back to complex query" , "err " , err , "pipelineIds " , pipelineIds )
766
+ impl .Logger .Errorw ("error in checking latest status table, falling back to complex query" , "pipelineIds " , pipelineIds , "err " , err )
794
767
return impl .ciWorkflowRepository .FindLastTriggeredWorkflowByCiIds (pipelineIds )
795
768
}
796
769
797
770
var allWorkflows []* pipelineConfig.CiWorkflow
798
771
799
- // Fetch from latest status table for available pipelines
800
772
if len (pipelinesInLatestTable ) > 0 {
801
773
workflowsFromLatestTable , err := impl .fetchWorkflowsFromLatestTable (pipelinesInLatestTable )
802
774
if err != nil {
803
- impl .Logger .Errorw ("error in fetching from latest status table" , "err " , err , "pipelineIds " , pipelinesInLatestTable )
775
+ impl .Logger .Errorw ("error in fetching from latest status table" , "pipelineIds " , pipelinesInLatestTable , "err " , err )
804
776
return nil , err
805
777
} else {
806
778
allWorkflows = append (allWorkflows , workflowsFromLatestTable ... )
807
779
}
808
780
}
809
781
810
- // Find pipeline IDs that are NOT in latest status table
811
- pipelinesNotInLatestTable := impl .findMissingPipelineIds (pipelineIds , pipelinesInLatestTable )
782
+ pipelinesNotInLatestTable := impl .getPipelineIdsNotInLatestTable (pipelineIds , pipelinesInLatestTable )
812
783
813
- // Fetch using complex query for missing pipeline IDs
814
784
if len (pipelinesNotInLatestTable ) > 0 {
815
785
workflowsFromComplexQuery , err := impl .ciWorkflowRepository .FindLastTriggeredWorkflowByCiIds (pipelinesNotInLatestTable )
816
786
if err != nil {
817
- impl .Logger .Errorw ("error in fetching using complex query" , "err " , err , "pipelineIds " , pipelinesNotInLatestTable )
787
+ impl .Logger .Errorw ("error in fetching using complex query" , "pipelineIds " , pipelinesNotInLatestTable , "err " , err )
818
788
return nil , err
819
789
} else {
820
790
allWorkflows = append (allWorkflows , workflowsFromComplexQuery ... )
821
791
}
822
792
}
823
793
824
- impl .Logger .Debugw ("hybrid workflow fetch completed" ,
825
- "totalPipelines" , len (pipelineIds ),
826
- "pipelinesFromLatestTable" , len (pipelinesInLatestTable ),
827
- "pipelinesFromOldQuery" , len (pipelinesNotInLatestTable ))
794
+ impl .Logger .Debugw ("hybrid workflow fetch completed" , "totalPipelines" , len (pipelineIds ), "pipelinesFromLatestTable" , len (pipelinesInLatestTable ), "pipelinesFromOldQuery" , len (pipelinesNotInLatestTable ))
828
795
829
796
return allWorkflows , nil
830
797
}
831
798
832
- // findMissingPipelineIds finds pipeline IDs that are NOT in the latest status table
833
- func (impl * CiHandlerImpl ) findMissingPipelineIds (allPipelineIds , pipelinesInLatestTable []int ) []int {
799
+ // getPipelineIdsNotInLatestTable finds pipeline IDs that are NOT in the latest status table
800
+ func (impl * CiHandlerImpl ) getPipelineIdsNotInLatestTable (allPipelineIds , pipelinesInLatestTable []int ) []int {
834
801
pipelineIdMap := make (map [int ]bool )
835
802
for _ , id := range pipelinesInLatestTable {
836
803
pipelineIdMap [id ] = true
@@ -1053,7 +1020,6 @@ func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewForEnvironment(request res
1053
1020
if len (ciPipelineIds ) == 0 {
1054
1021
return ciWorkflowStatuses , nil
1055
1022
}
1056
- // Hybrid approach: Use latest status table for available pipelines, fallback to complex old query for others
1057
1023
latestCiWorkflows , err := impl .fetchLastTriggeredWorkflowsHybrid (ciPipelineIds )
1058
1024
if err != nil && ! util .IsErrNoRows (err ) {
1059
1025
impl .Logger .Errorw ("err in hybrid ci workflow fetch" , "ciPipelineIds" , ciPipelineIds , "err" , err )
0 commit comments