@@ -10,6 +10,7 @@ import (
1010
1111 actions_model "code.gitea.io/gitea/models/actions"
1212 "code.gitea.io/gitea/models/db"
13+ "code.gitea.io/gitea/modules/container"
1314 "code.gitea.io/gitea/modules/graceful"
1415 "code.gitea.io/gitea/modules/log"
1516 "code.gitea.io/gitea/modules/queue"
@@ -46,44 +47,87 @@ func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate {
4647}
4748
4849func checkJobsByRunID (ctx context.Context , runID int64 ) error {
49- run , exist , err := db . GetByID [ actions_model.ActionRun ] (ctx , runID )
50+ run , err := actions_model .GetRunByID (ctx , runID )
5051 if err != nil {
5152 return fmt .Errorf ("get action run: %w" , err )
5253 }
53- if ! exist {
54- return fmt .Errorf ("action run %d does not exist" , runID )
55- }
5654
5755 return db .WithTx (ctx , func (ctx context.Context ) error {
5856 // check jobs of the current run
5957 if err := checkJobsOfRun (ctx , run ); err != nil {
6058 return err
6159 }
6260
63- // check jobs by the concurrency group of the run
64- if len (run .ConcurrencyGroup ) == 0 {
65- return nil
61+ // check run (workflow-level) concurrency
62+ concurrentRunIDs := make (container.Set [int64 ])
63+ if len (run .ConcurrencyGroup ) > 0 && ! run .ConcurrencyCancel {
64+ concurrentRuns , err := db .Find [actions_model.ActionRun ](ctx , actions_model.FindRunOptions {
65+ RepoID : run .RepoID ,
66+ ConcurrencyGroup : run .ConcurrencyGroup ,
67+ Status : []actions_model.Status {actions_model .StatusBlocked },
68+ })
69+ if err != nil {
70+ return err
71+ }
72+ for _ , cRun := range concurrentRuns {
73+ concurrentRunIDs .Add (cRun .ID )
74+ if cRun .NeedApproval {
75+ continue
76+ }
77+ if err := checkJobsOfRun (ctx , cRun ); err != nil {
78+ return err
79+ }
80+ updatedRun , err := actions_model .GetRunByID (ctx , cRun .ID )
81+ if err != nil {
82+ return err
83+ }
84+ if updatedRun .Status == actions_model .StatusWaiting {
85+ // only run one blocked action run in the same concurrency group
86+ break
87+ }
88+ }
6689 }
67- concurrentActionRuns , err := db .Find [actions_model.ActionRun ](ctx , & actions_model.FindRunOptions {
68- RepoID : run .RepoID ,
69- ConcurrencyGroup : run .ConcurrencyGroup ,
70- Status : []actions_model.Status {
71- actions_model .StatusBlocked ,
72- },
73- SortType : "oldest" ,
74- })
90+
91+ // check job concurrency
92+ concurrentJobs , err := db .Find [actions_model.ActionRunJob ](ctx , actions_model.FindRunJobOptions {RunID : run .ID })
7593 if err != nil {
76- return fmt . Errorf ( "find action run with concurrency group %s: %w" , run . ConcurrencyGroup , err )
94+ return err
7795 }
78- for _ , cRun := range concurrentActionRuns {
79- if cRun .NeedApproval {
80- continue
81- }
82- if err := checkJobsOfRun (ctx , cRun ); err != nil {
83- return err
96+ for _ , job := range concurrentJobs {
97+ if job .Status .IsDone () && len (job .ConcurrencyGroup ) > 0 && ! job .ConcurrencyCancel {
98+ concurrentJobs , err := db .Find [actions_model.ActionRunJob ](ctx , actions_model.FindRunJobOptions {
99+ RepoID : job .RepoID ,
100+ ConcurrencyGroup : job .ConcurrencyGroup ,
101+ Statuses : []actions_model.Status {actions_model .StatusBlocked },
102+ })
103+ if err != nil {
104+ return err
105+ }
106+ for _ , cJob := range concurrentJobs {
107+ if concurrentRunIDs .Contains (cJob .RunID ) {
108+ continue
109+ }
110+ cRun , err := actions_model .GetRunByID (ctx , cJob .RunID )
111+ if err != nil {
112+ return err
113+ }
114+ if cRun .NeedApproval {
115+ continue
116+ }
117+ if err := checkJobsOfRun (ctx , cRun ); err != nil {
118+ return err
119+ }
120+ updatedJob , err := actions_model .GetRunJobByID (ctx , cJob .ID )
121+ if err != nil {
122+ return err
123+ }
124+ if updatedJob .Status == actions_model .StatusWaiting {
125+ break
126+ }
127+ }
84128 }
85- break // only run one blocked action run with the same concurrency group
86129 }
130+
87131 return nil
88132 })
89133}
@@ -189,7 +233,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
189233 }
190234 if allDone {
191235 // check concurrency
192- blockedByJobConcurrency , err := checkJobConcurrency (ctx , r .jobMap [id ], r .vars )
236+ blockedByJobConcurrency , err := checkConcurrencyForJobWithNeeds (ctx , r .jobMap [id ], r .vars )
193237 if err != nil {
194238 log .Error ("Check run %d job %d concurrency: %v. This job will stay blocked." )
195239 continue
@@ -223,7 +267,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
223267 return ret
224268}
225269
226- func checkJobConcurrency (ctx context.Context , actionRunJob * actions_model.ActionRunJob , vars map [string ]string ) (bool , error ) {
270+ func checkConcurrencyForJobWithNeeds (ctx context.Context , actionRunJob * actions_model.ActionRunJob , vars map [string ]string ) (bool , error ) {
227271 if len (actionRunJob .RawConcurrencyGroup ) == 0 {
228272 return false , nil
229273 }
0 commit comments