@@ -22,7 +22,6 @@ import (
2222 "code.gitea.io/gitea/modules/util"
2323 webhook_module "code.gitea.io/gitea/modules/webhook"
2424
25- "github.com/nektos/act/pkg/jobparser"
2625 "xorm.io/builder"
2726)
2827
@@ -49,6 +48,8 @@ type ActionRun struct {
4948 TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
5049 Status Status `xorm:"index"`
5150 Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
51+ ConcurrencyGroup string `xorm:"index"`
52+ ConcurrencyCancel bool
5253 // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5354 Started timeutil.TimeStamp
5455 Stopped timeutil.TimeStamp
@@ -170,7 +171,7 @@ func (run *ActionRun) IsSchedule() bool {
170171 return run .ScheduleID > 0
171172}
172173
173- func updateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
174+ func UpdateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
174175 _ , err := db .GetEngine (ctx ).ID (repo .ID ).
175176 NoAutoTime ().
176177 SetExpr ("num_action_runs" ,
@@ -227,121 +228,69 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
227228 return cancelledJobs , err
228229 }
229230
230- // Iterate over each job and attempt to cancel it.
231- for _ , job := range jobs {
232- // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
233- status := job .Status
234- if status .IsDone () {
235- continue
236- }
237-
238- // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
239- if job .TaskID == 0 {
240- job .Status = StatusCancelled
241- job .Stopped = timeutil .TimeStampNow ()
242-
243- // Update the job's status and stopped time in the database.
244- n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
245- if err != nil {
246- return cancelledJobs , err
247- }
248-
249- // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
250- if n == 0 {
251- return cancelledJobs , errors .New ("job has changed, try again" )
252- }
253-
254- cancelledJobs = append (cancelledJobs , job )
255- // Continue with the next job.
256- continue
257- }
258-
259- // If the job has an associated task, try to stop the task, effectively cancelling the job.
260- if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
261- return cancelledJobs , err
262- }
263- cancelledJobs = append (cancelledJobs , job )
231+ cjs , err := CancelJobs (ctx , jobs )
232+ if err != nil {
233+ return cancelledJobs , err
264234 }
235+ cancelledJobs = append (cancelledJobs , cjs ... )
265236 }
266237
267238 // Return nil to indicate successful cancellation of all running and waiting jobs.
268239 return cancelledJobs , nil
269240}
270241
271- // InsertRun inserts a run
272- // The title will be cut off at 255 characters if it's longer than 255 characters.
273- func InsertRun (ctx context.Context , run * ActionRun , jobs []* jobparser.SingleWorkflow ) error {
274- ctx , committer , err := db .TxContext (ctx )
275- if err != nil {
276- return err
277- }
278- defer committer .Close ()
279-
280- index , err := db .GetNextResourceIndex (ctx , "action_run_index" , run .RepoID )
281- if err != nil {
282- return err
283- }
284- run .Index = index
285- run .Title = util .EllipsisDisplayString (run .Title , 255 )
242+ func CancelJobs (ctx context.Context , jobs []* ActionRunJob ) ([]* ActionRunJob , error ) {
243+ cancelledJobs := make ([]* ActionRunJob , 0 , len (jobs ))
244+ // Iterate over each job and attempt to cancel it.
245+ for _ , job := range jobs {
246+ // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
247+ status := job .Status
248+ if status .IsDone () {
249+ continue
250+ }
286251
287- if err := db .Insert (ctx , run ); err != nil {
288- return err
289- }
252+ // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
253+ if job .TaskID == 0 {
254+ job .Status = StatusCancelled
255+ job .Stopped = timeutil .TimeStampNow ()
290256
291- if run .Repo == nil {
292- repo , err := repo_model .GetRepositoryByID (ctx , run .RepoID )
293- if err != nil {
294- return err
295- }
296- run .Repo = repo
297- }
257+ // Update the job's status and stopped time in the database.
258+ n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
259+ if err != nil {
260+ return cancelledJobs , err
261+ }
298262
299- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
300- return err
301- }
263+ // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
264+ if n == 0 {
265+ return cancelledJobs , errors .New ("job has changed, try again" )
266+ }
302267
303- runJobs := make ([]* ActionRunJob , 0 , len (jobs ))
304- var hasWaiting bool
305- for _ , v := range jobs {
306- id , job := v .Job ()
307- needs := job .Needs ()
308- if err := v .SetJob (id , job .EraseNeeds ()); err != nil {
309- return err
268+ cancelledJobs = append (cancelledJobs , job )
269+ // Continue with the next job.
270+ continue
310271 }
311- payload , _ := v .Marshal ()
312- status := StatusWaiting
313- if len (needs ) > 0 || run .NeedApproval {
314- status = StatusBlocked
315- } else {
316- hasWaiting = true
272+
273+ // If the job has an associated task, try to stop the task, effectively cancelling the job.
274+ if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
275+ return cancelledJobs , err
317276 }
318- job .Name = util .EllipsisDisplayString (job .Name , 255 )
319- runJobs = append (runJobs , & ActionRunJob {
320- RunID : run .ID ,
321- RepoID : run .RepoID ,
322- OwnerID : run .OwnerID ,
323- CommitSHA : run .CommitSHA ,
324- IsForkPullRequest : run .IsForkPullRequest ,
325- Name : job .Name ,
326- WorkflowPayload : payload ,
327- JobID : id ,
328- Needs : needs ,
329- RunsOn : job .RunsOn (),
330- Status : status ,
331- })
332- }
333- if err := db .Insert (ctx , runJobs ); err != nil {
334- return err
277+ cancelledJobs = append (cancelledJobs , job )
335278 }
336279
337- // if there is a job in the waiting status, increase tasks version.
338- if hasWaiting {
339- if err := IncreaseTaskVersion (ctx , run .OwnerID , run .RepoID ); err != nil {
340- return err
341- }
280+ // Return nil to indicate successful cancellation of all running and waiting jobs.
281+ return cancelledJobs , nil
282+ }
283+
284+ func GetRunByID (ctx context.Context , id int64 ) (* ActionRun , error ) {
285+ var run ActionRun
286+ has , err := db .GetEngine (ctx ).Where ("id=?" , id ).Get (& run )
287+ if err != nil {
288+ return nil , err
289+ } else if ! has {
290+ return nil , fmt .Errorf ("run with id %d: %w" , id , util .ErrNotExist )
342291 }
343292
344- return committer . Commit ()
293+ return & run , nil
345294}
346295
347296func GetRunByRepoAndID (ctx context.Context , repoID , runID int64 ) (* ActionRun , error ) {
@@ -426,7 +375,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
426375 if err = run .LoadRepo (ctx ); err != nil {
427376 return err
428377 }
429- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
378+ if err := UpdateRepoRunsNumbers (ctx , run .Repo ); err != nil {
430379 return err
431380 }
432381 }
@@ -435,3 +384,21 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
435384}
436385
437386type ActionRunIndex db.ResourceIndex
387+
388+ func ShouldBlockRunByConcurrency (ctx context.Context , actionRun * ActionRun ) (bool , error ) {
389+ if actionRun .ConcurrencyGroup == "" || actionRun .ConcurrencyCancel {
390+ return false , nil
391+ }
392+
393+ concurrentRuns , err := db .Find [ActionRun ](ctx , & FindRunOptions {
394+ RepoID : actionRun .RepoID ,
395+ ConcurrencyGroup : actionRun .ConcurrencyGroup ,
396+ Status : []Status {StatusWaiting , StatusRunning },
397+ })
398+ if err != nil {
399+ return false , fmt .Errorf ("find running and waiting runs: %w" , err )
400+ }
401+ previousRuns := slices .DeleteFunc (concurrentRuns , func (r * ActionRun ) bool { return r .ID == actionRun .ID })
402+
403+ return len (previousRuns ) > 0 , nil
404+ }
0 commit comments