@@ -21,7 +21,6 @@ import (
2121 "code.gitea.io/gitea/modules/util"
2222 webhook_module "code.gitea.io/gitea/modules/webhook"
2323
24- "github.com/nektos/act/pkg/jobparser"
2524 "xorm.io/builder"
2625)
2726
@@ -48,6 +47,8 @@ type ActionRun struct {
4847 TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
4948 Status Status `xorm:"index"`
5049 Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
50+ ConcurrencyGroup string `xorm:"index"`
51+ ConcurrencyCancel bool
5152 // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5253 Started timeutil.TimeStamp
5354 Stopped timeutil.TimeStamp
@@ -169,7 +170,7 @@ func (run *ActionRun) IsSchedule() bool {
169170 return run .ScheduleID > 0
170171}
171172
172- func updateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
173+ func UpdateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
173174 _ , err := db .GetEngine (ctx ).ID (repo .ID ).
174175 NoAutoTime ().
175176 SetExpr ("num_action_runs" ,
@@ -226,121 +227,57 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
226227 return cancelledJobs , err
227228 }
228229
229- // Iterate over each job and attempt to cancel it.
230- for _ , job := range jobs {
231- // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
232- status := job .Status
233- if status .IsDone () {
234- continue
235- }
236-
237- // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
238- if job .TaskID == 0 {
239- job .Status = StatusCancelled
240- job .Stopped = timeutil .TimeStampNow ()
241-
242- // Update the job's status and stopped time in the database.
243- n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
244- if err != nil {
245- return cancelledJobs , err
246- }
247-
248- // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
249- if n == 0 {
250- return cancelledJobs , errors .New ("job has changed, try again" )
251- }
252-
253- cancelledJobs = append (cancelledJobs , job )
254- // Continue with the next job.
255- continue
256- }
257-
258- // If the job has an associated task, try to stop the task, effectively cancelling the job.
259- if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
260- return cancelledJobs , err
261- }
262- cancelledJobs = append (cancelledJobs , job )
230+ cjs , err := CancelJobs (ctx , jobs )
231+ if err != nil {
232+ return cancelledJobs , err
263233 }
234+ cancelledJobs = append (cancelledJobs , cjs ... )
264235 }
265236
266237 // Return nil to indicate successful cancellation of all running and waiting jobs.
267238 return cancelledJobs , nil
268239}
269240
270- // InsertRun inserts a run
271- // The title will be cut off at 255 characters if it's longer than 255 characters.
272- func InsertRun (ctx context.Context , run * ActionRun , jobs []* jobparser.SingleWorkflow ) error {
273- ctx , committer , err := db .TxContext (ctx )
274- if err != nil {
275- return err
276- }
277- defer committer .Close ()
278-
279- index , err := db .GetNextResourceIndex (ctx , "action_run_index" , run .RepoID )
280- if err != nil {
281- return err
282- }
283- run .Index = index
284- run .Title = util .EllipsisDisplayString (run .Title , 255 )
241+ func CancelJobs (ctx context.Context , jobs []* ActionRunJob ) ([]* ActionRunJob , error ) {
242+ cancelledJobs := make ([]* ActionRunJob , 0 , len (jobs ))
243+ // Iterate over each job and attempt to cancel it.
244+ for _ , job := range jobs {
245+ // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
246+ status := job .Status
247+ if status .IsDone () {
248+ continue
249+ }
285250
286- if err := db .Insert (ctx , run ); err != nil {
287- return err
288- }
251+ // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
252+ if job .TaskID == 0 {
253+ job .Status = StatusCancelled
254+ job .Stopped = timeutil .TimeStampNow ()
289255
290- if run .Repo == nil {
291- repo , err := repo_model .GetRepositoryByID (ctx , run .RepoID )
292- if err != nil {
293- return err
294- }
295- run .Repo = repo
296- }
256+ // Update the job's status and stopped time in the database.
257+ n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
258+ if err != nil {
259+ return cancelledJobs , err
260+ }
297261
298- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
299- return err
300- }
262+ // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
263+ if n == 0 {
264+ return cancelledJobs , errors .New ("job has changed, try again" )
265+ }
301266
302- runJobs := make ([]* ActionRunJob , 0 , len (jobs ))
303- var hasWaiting bool
304- for _ , v := range jobs {
305- id , job := v .Job ()
306- needs := job .Needs ()
307- if err := v .SetJob (id , job .EraseNeeds ()); err != nil {
308- return err
309- }
310- payload , _ := v .Marshal ()
311- status := StatusWaiting
312- if len (needs ) > 0 || run .NeedApproval {
313- status = StatusBlocked
314- } else {
315- hasWaiting = true
267+ cancelledJobs = append (cancelledJobs , job )
268+ // Continue with the next job.
269+ continue
316270 }
317- job .Name = util .EllipsisDisplayString (job .Name , 255 )
318- runJobs = append (runJobs , & ActionRunJob {
319- RunID : run .ID ,
320- RepoID : run .RepoID ,
321- OwnerID : run .OwnerID ,
322- CommitSHA : run .CommitSHA ,
323- IsForkPullRequest : run .IsForkPullRequest ,
324- Name : job .Name ,
325- WorkflowPayload : payload ,
326- JobID : id ,
327- Needs : needs ,
328- RunsOn : job .RunsOn (),
329- Status : status ,
330- })
331- }
332- if err := db .Insert (ctx , runJobs ); err != nil {
333- return err
334- }
335271
336- // if there is a job in the waiting status, increase tasks version.
337- if hasWaiting {
338- if err := IncreaseTaskVersion (ctx , run .OwnerID , run .RepoID ); err != nil {
339- return err
272+ // If the job has an associated task, try to stop the task, effectively cancelling the job.
273+ if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
274+ return cancelledJobs , err
340275 }
276+ cancelledJobs = append (cancelledJobs , job )
341277 }
342278
343- return committer .Commit ()
279+ // Return nil to indicate successful cancellation of all running and waiting jobs.
280+ return cancelledJobs , nil
344281}
345282
346283func GetRunByID (ctx context.Context , id int64 ) (* ActionRun , error ) {
@@ -432,7 +369,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
432369 }
433370 run .Repo = repo
434371 }
435- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
372+ if err := UpdateRepoRunsNumbers (ctx , run .Repo ); err != nil {
436373 return err
437374 }
438375 }
@@ -441,3 +378,21 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
441378}
442379
443380type ActionRunIndex db.ResourceIndex
381+
382+ func ShouldBlockRunByConcurrency (ctx context.Context , actionRun * ActionRun ) (bool , error ) {
383+ if actionRun .ConcurrencyGroup == "" || actionRun .ConcurrencyCancel {
384+ return false , nil
385+ }
386+
387+ concurrentRuns , err := db .Find [ActionRun ](ctx , & FindRunOptions {
388+ RepoID : actionRun .RepoID ,
389+ ConcurrencyGroup : actionRun .ConcurrencyGroup ,
390+ Status : []Status {StatusWaiting , StatusRunning },
391+ })
392+ if err != nil {
393+ return false , fmt .Errorf ("find running and waiting runs: %w" , err )
394+ }
395+ previousRuns := slices .DeleteFunc (concurrentRuns , func (r * ActionRun ) bool { return r .ID == actionRun .ID })
396+
397+ return len (previousRuns ) > 0 , nil
398+ }
0 commit comments