@@ -21,7 +21,6 @@ import (
2121 "code.gitea.io/gitea/modules/util"
2222 webhook_module "code.gitea.io/gitea/modules/webhook"
2323
24- "github.com/nektos/act/pkg/jobparser"
2524 "xorm.io/builder"
2625)
2726
@@ -48,6 +47,8 @@ type ActionRun struct {
4847 TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
4948 Status Status `xorm:"index"`
5049 Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
50+ ConcurrencyGroup string
51+ ConcurrencyCancel bool
5152 // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5253 Started timeutil.TimeStamp
5354 Stopped timeutil.TimeStamp
@@ -169,7 +170,7 @@ func (run *ActionRun) IsSchedule() bool {
169170 return run .ScheduleID > 0
170171}
171172
172- func updateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
173+ func UpdateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
173174 _ , err := db .GetEngine (ctx ).ID (repo .ID ).
174175 SetExpr ("num_action_runs" ,
175176 builder .Select ("count(*)" ).From ("action_run" ).
@@ -266,80 +267,41 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
266267 return cancelledJobs , nil
267268}
268269
269- // InsertRun inserts a run
270- // The title will be cut off at 255 characters if it's longer than 255 characters .
271- func InsertRun ( ctx context. Context , run * ActionRun , jobs [] * jobparser. SingleWorkflow ) error {
272- ctx , committer , err := db . TxContext ( ctx )
273- if err != nil {
274- return err
275- }
276- defer committer . Close ()
270+ func CancelJobs ( ctx context. Context , jobs [] * ActionRunJob ) error {
271+ // Iterate over each job and attempt to cancel it .
272+ for _ , job := range jobs {
273+ // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
274+ status := job . Status
275+ if status . IsDone () {
276+ continue
277+ }
277278
278- index , err := db .GetNextResourceIndex (ctx , "action_run_index" , run .RepoID )
279- if err != nil {
280- return err
281- }
282- run .Index = index
283- run .Title = util .EllipsisDisplayString (run .Title , 255 )
279+ // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
280+ if job .TaskID == 0 {
281+ job .Status = StatusCancelled
282+ job .Stopped = timeutil .TimeStampNow ()
284283
285- if err := db .Insert (ctx , run ); err != nil {
286- return err
287- }
288-
289- if run .Repo == nil {
290- repo , err := repo_model .GetRepositoryByID (ctx , run .RepoID )
291- if err != nil {
292- return err
293- }
294- run .Repo = repo
295- }
284+ // Update the job's status and stopped time in the database.
285+ n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
286+ if err != nil {
287+ return err
288+ }
296289
297- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
298- return err
299- }
290+ // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
291+ if n == 0 {
292+ return fmt .Errorf ("job has changed, try again" )
293+ }
300294
301- runJobs := make ([]* ActionRunJob , 0 , len (jobs ))
302- var hasWaiting bool
303- for _ , v := range jobs {
304- id , job := v .Job ()
305- needs := job .Needs ()
306- if err := v .SetJob (id , job .EraseNeeds ()); err != nil {
307- return err
308- }
309- payload , _ := v .Marshal ()
310- status := StatusWaiting
311- if len (needs ) > 0 || run .NeedApproval {
312- status = StatusBlocked
313- } else {
314- hasWaiting = true
295+ // Continue with the next job.
296+ continue
315297 }
316- job .Name = util .EllipsisDisplayString (job .Name , 255 )
317- runJobs = append (runJobs , & ActionRunJob {
318- RunID : run .ID ,
319- RepoID : run .RepoID ,
320- OwnerID : run .OwnerID ,
321- CommitSHA : run .CommitSHA ,
322- IsForkPullRequest : run .IsForkPullRequest ,
323- Name : job .Name ,
324- WorkflowPayload : payload ,
325- JobID : id ,
326- Needs : needs ,
327- RunsOn : job .RunsOn (),
328- Status : status ,
329- })
330- }
331- if err := db .Insert (ctx , runJobs ); err != nil {
332- return err
333- }
334298
335- // if there is a job in the waiting status, increase tasks version.
336- if hasWaiting {
337- if err := IncreaseTaskVersion (ctx , run .OwnerID , run .RepoID ); err != nil {
299+ // If the job has an associated task, try to stop the task, effectively cancelling the job.
300+ if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
338301 return err
339302 }
340303 }
341-
342- return committer .Commit ()
304+ return nil
343305}
344306
345307func GetRunByID (ctx context.Context , id int64 ) (* ActionRun , error ) {
@@ -431,7 +393,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
431393 }
432394 run .Repo = repo
433395 }
434- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
396+ if err := UpdateRepoRunsNumbers (ctx , run .Repo ); err != nil {
435397 return err
436398 }
437399 }
@@ -440,3 +402,21 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
440402}
441403
442404type ActionRunIndex db.ResourceIndex
405+
406+ func ShouldBlockRunByConcurrency (ctx context.Context , actionRun * ActionRun ) (bool , error ) {
407+ if actionRun .ConcurrencyGroup == "" || actionRun .ConcurrencyCancel {
408+ return false , nil
409+ }
410+
411+ concurrentRuns , err := db .Find [ActionRun ](ctx , & FindRunOptions {
412+ RepoID : actionRun .RepoID ,
413+ ConcurrencyGroup : actionRun .ConcurrencyGroup ,
414+ Status : []Status {StatusWaiting , StatusRunning },
415+ })
416+ if err != nil {
417+ return false , fmt .Errorf ("find running and waiting runs: %w" , err )
418+ }
419+ previousRuns := slices .DeleteFunc (concurrentRuns , func (r * ActionRun ) bool { return r .ID == actionRun .ID })
420+
421+ return len (previousRuns ) > 0 , nil
422+ }
0 commit comments