@@ -20,7 +20,6 @@ import (
2020 "code.gitea.io/gitea/modules/util"
2121 webhook_module "code.gitea.io/gitea/modules/webhook"
2222
23- "github.com/nektos/act/pkg/jobparser"
2423 "xorm.io/builder"
2524)
2625
@@ -47,6 +46,8 @@ type ActionRun struct {
4746 TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
4847 Status Status `xorm:"index"`
4948 Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
49+ ConcurrencyGroup string
50+ ConcurrencyCancel bool
5051 // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5152 Started timeutil.TimeStamp
5253 Stopped timeutil.TimeStamp
@@ -168,7 +169,7 @@ func (run *ActionRun) IsSchedule() bool {
168169 return run .ScheduleID > 0
169170}
170171
171- func updateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
172+ func UpdateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
172173 _ , err := db .GetEngine (ctx ).ID (repo .ID ).
173174 SetExpr ("num_action_runs" ,
174175 builder .Select ("count(*)" ).From ("action_run" ).
@@ -222,119 +223,50 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
222223 return err
223224 }
224225
225- // Iterate over each job and attempt to cancel it.
226- for _ , job := range jobs {
227- // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
228- status := job .Status
229- if status .IsDone () {
230- continue
231- }
232-
233- // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
234- if job .TaskID == 0 {
235- job .Status = StatusCancelled
236- job .Stopped = timeutil .TimeStampNow ()
237-
238- // Update the job's status and stopped time in the database.
239- n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
240- if err != nil {
241- return err
242- }
243-
244- // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
245- if n == 0 {
246- return fmt .Errorf ("job has changed, try again" )
247- }
248-
249- // Continue with the next job.
250- continue
251- }
252-
253- // If the job has an associated task, try to stop the task, effectively cancelling the job.
254- if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
255- return err
256- }
226+ if err := CancelJobs (ctx , jobs ); err != nil {
227+ return err
257228 }
258229 }
259230
260231 // Return nil to indicate successful cancellation of all running and waiting jobs.
261232 return nil
262233}
263234
264- // InsertRun inserts a run
265- // The title will be cut off at 255 characters if it's longer than 255 characters.
266- func InsertRun (ctx context.Context , run * ActionRun , jobs []* jobparser.SingleWorkflow ) error {
267- ctx , committer , err := db .TxContext (ctx )
268- if err != nil {
269- return err
270- }
271- defer committer .Close ()
272-
273- index , err := db .GetNextResourceIndex (ctx , "action_run_index" , run .RepoID )
274- if err != nil {
275- return err
276- }
277- run .Index = index
278- run .Title = util .EllipsisDisplayString (run .Title , 255 )
235+ func CancelJobs (ctx context.Context , jobs []* ActionRunJob ) error {
236+ // Iterate over each job and attempt to cancel it.
237+ for _ , job := range jobs {
238+ // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
239+ status := job .Status
240+ if status .IsDone () {
241+ continue
242+ }
279243
280- if err := db .Insert (ctx , run ); err != nil {
281- return err
282- }
244+ // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
245+ if job .TaskID == 0 {
246+ job .Status = StatusCancelled
247+ job .Stopped = timeutil .TimeStampNow ()
283248
284- if run .Repo == nil {
285- repo , err := repo_model .GetRepositoryByID (ctx , run .RepoID )
286- if err != nil {
287- return err
288- }
289- run .Repo = repo
290- }
249+ // Update the job's status and stopped time in the database.
250+ n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
251+ if err != nil {
252+ return err
253+ }
291254
292- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
293- return err
294- }
255+ // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
256+ if n == 0 {
257+ return fmt .Errorf ("job has changed, try again" )
258+ }
295259
296- runJobs := make ([]* ActionRunJob , 0 , len (jobs ))
297- var hasWaiting bool
298- for _ , v := range jobs {
299- id , job := v .Job ()
300- needs := job .Needs ()
301- if err := v .SetJob (id , job .EraseNeeds ()); err != nil {
302- return err
260+ // Continue with the next job.
261+ continue
303262 }
304- payload , _ := v .Marshal ()
305- status := StatusWaiting
306- if len (needs ) > 0 || run .NeedApproval {
307- status = StatusBlocked
308- } else {
309- hasWaiting = true
310- }
311- job .Name = util .EllipsisDisplayString (job .Name , 255 )
312- runJobs = append (runJobs , & ActionRunJob {
313- RunID : run .ID ,
314- RepoID : run .RepoID ,
315- OwnerID : run .OwnerID ,
316- CommitSHA : run .CommitSHA ,
317- IsForkPullRequest : run .IsForkPullRequest ,
318- Name : job .Name ,
319- WorkflowPayload : payload ,
320- JobID : id ,
321- Needs : needs ,
322- RunsOn : job .RunsOn (),
323- Status : status ,
324- })
325- }
326- if err := db .Insert (ctx , runJobs ); err != nil {
327- return err
328- }
329263
330- // if there is a job in the waiting status, increase tasks version.
331- if hasWaiting {
332- if err := IncreaseTaskVersion (ctx , run .OwnerID , run .RepoID ); err != nil {
264+ // If the job has an associated task, try to stop the task, effectively cancelling the job.
265+ if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
333266 return err
334267 }
335268 }
336-
337- return committer .Commit ()
269+ return nil
338270}
339271
340272func GetRunByID (ctx context.Context , id int64 ) (* ActionRun , error ) {
@@ -426,7 +358,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
426358 }
427359 run .Repo = repo
428360 }
429- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
361+ if err := UpdateRepoRunsNumbers (ctx , run .Repo ); err != nil {
430362 return err
431363 }
432364 }
@@ -435,3 +367,21 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
435367}
436368
437369type ActionRunIndex db.ResourceIndex
370+
371+ func ShouldBlockRunByConcurrency (ctx context.Context , actionRun * ActionRun ) (bool , error ) {
372+ if actionRun .ConcurrencyGroup == "" || actionRun .ConcurrencyCancel {
373+ return false , nil
374+ }
375+
376+ concurrentRuns , err := db .Find [ActionRun ](ctx , & FindRunOptions {
377+ RepoID : actionRun .RepoID ,
378+ ConcurrencyGroup : actionRun .ConcurrencyGroup ,
379+ Status : []Status {StatusWaiting , StatusRunning },
380+ })
381+ if err != nil {
382+ return false , fmt .Errorf ("find running and waiting runs: %w" , err )
383+ }
384+ previousRuns := slices .DeleteFunc (concurrentRuns , func (r * ActionRun ) bool { return r .ID == actionRun .ID })
385+
386+ return len (previousRuns ) > 0 , nil
387+ }
0 commit comments