@@ -22,7 +22,6 @@ import (
2222 "code.gitea.io/gitea/modules/util"
2323 webhook_module "code.gitea.io/gitea/modules/webhook"
2424
25- "github.com/nektos/act/pkg/jobparser"
2625 "xorm.io/builder"
2726)
2827
@@ -49,6 +48,8 @@ type ActionRun struct {
4948 TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
5049 Status Status `xorm:"index"`
5150 Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
51+ ConcurrencyGroup string `xorm:"index"`
52+ ConcurrencyCancel bool
5253 // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5354 Started timeutil.TimeStamp
5455 Stopped timeutil.TimeStamp
@@ -170,7 +171,7 @@ func (run *ActionRun) IsSchedule() bool {
170171 return run .ScheduleID > 0
171172}
172173
173- func updateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
174+ func UpdateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
174175 _ , err := db .GetEngine (ctx ).ID (repo .ID ).
175176 NoAutoTime ().
176177 SetExpr ("num_action_runs" ,
@@ -227,121 +228,69 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
227228 return cancelledJobs , err
228229 }
229230
230- // Iterate over each job and attempt to cancel it.
231- for _ , job := range jobs {
232- // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
233- status := job .Status
234- if status .IsDone () {
235- continue
236- }
231+ cjs , err := CancelJobs (ctx , jobs )
232+ if err != nil {
233+ return cancelledJobs , err
234+ }
235+ cancelledJobs = append (cancelledJobs , cjs ... )
236+ }
237237
238- // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
239- if job .TaskID == 0 {
240- job .Status = StatusCancelled
241- job .Stopped = timeutil .TimeStampNow ()
238+ // Return nil to indicate successful cancellation of all running and waiting jobs.
239+ return cancelledJobs , nil
240+ }
242241
243- // Update the job's status and stopped time in the database.
244- n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
245- if err != nil {
246- return cancelledJobs , err
247- }
242+ func CancelJobs (ctx context.Context , jobs []* ActionRunJob ) ([]* ActionRunJob , error ) {
243+ cancelledJobs := make ([]* ActionRunJob , 0 , len (jobs ))
244+ // Iterate over each job and attempt to cancel it.
245+ for _ , job := range jobs {
246+ // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
247+ status := job .Status
248+ if status .IsDone () {
249+ continue
250+ }
248251
249- // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again .
250- if n == 0 {
251- return cancelledJobs , errors . New ( " job has changed, try again" )
252- }
252+ // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it .
253+ if job . TaskID == 0 {
254+ job . Status = StatusCancelled
255+ job . Stopped = timeutil . TimeStampNow ()
253256
254- cancelledJobs = append (cancelledJobs , job )
255- // Continue with the next job.
256- continue
257+ // Update the job's status and stopped time in the database.
258+ n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
259+ if err != nil {
260+ return cancelledJobs , err
257261 }
258262
259- // If the job has an associated task, try to stop the task, effectively cancelling the job .
260- if err := StopTask ( ctx , job . TaskID , StatusCancelled ); err != nil {
261- return cancelledJobs , err
263+ // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again .
264+ if n == 0 {
265+ return cancelledJobs , errors . New ( "job has changed, try again" )
262266 }
267+
263268 cancelledJobs = append (cancelledJobs , job )
269+ // Continue with the next job.
270+ continue
264271 }
272+
273+ // If the job has an associated task, try to stop the task, effectively cancelling the job.
274+ if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
275+ return cancelledJobs , err
276+ }
277+ cancelledJobs = append (cancelledJobs , job )
265278 }
266279
267280 // Return nil to indicate successful cancellation of all running and waiting jobs.
268281 return cancelledJobs , nil
269282}
270283
271- // InsertRun inserts a run
272- // The title will be cut off at 255 characters if it's longer than 255 characters.
273- func InsertRun (ctx context.Context , run * ActionRun , jobs []* jobparser.SingleWorkflow ) error {
274- ctx , committer , err := db .TxContext (ctx )
275- if err != nil {
276- return err
277- }
278- defer committer .Close ()
279-
280- index , err := db .GetNextResourceIndex (ctx , "action_run_index" , run .RepoID )
284+ func GetRunByID (ctx context.Context , id int64 ) (* ActionRun , error ) {
285+ var run ActionRun
286+ has , err := db .GetEngine (ctx ).Where ("id=?" , id ).Get (& run )
281287 if err != nil {
282- return err
283- }
284- run .Index = index
285- run .Title = util .EllipsisDisplayString (run .Title , 255 )
286-
287- if err := db .Insert (ctx , run ); err != nil {
288- return err
289- }
290-
291- if run .Repo == nil {
292- repo , err := repo_model .GetRepositoryByID (ctx , run .RepoID )
293- if err != nil {
294- return err
295- }
296- run .Repo = repo
297- }
298-
299- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
300- return err
301- }
302-
303- runJobs := make ([]* ActionRunJob , 0 , len (jobs ))
304- var hasWaiting bool
305- for _ , v := range jobs {
306- id , job := v .Job ()
307- needs := job .Needs ()
308- if err := v .SetJob (id , job .EraseNeeds ()); err != nil {
309- return err
310- }
311- payload , _ := v .Marshal ()
312- status := StatusWaiting
313- if len (needs ) > 0 || run .NeedApproval {
314- status = StatusBlocked
315- } else {
316- hasWaiting = true
317- }
318- job .Name = util .EllipsisDisplayString (job .Name , 255 )
319- runJobs = append (runJobs , & ActionRunJob {
320- RunID : run .ID ,
321- RepoID : run .RepoID ,
322- OwnerID : run .OwnerID ,
323- CommitSHA : run .CommitSHA ,
324- IsForkPullRequest : run .IsForkPullRequest ,
325- Name : job .Name ,
326- WorkflowPayload : payload ,
327- JobID : id ,
328- Needs : needs ,
329- RunsOn : job .RunsOn (),
330- Status : status ,
331- })
332- }
333- if err := db .Insert (ctx , runJobs ); err != nil {
334- return err
335- }
336-
337- // if there is a job in the waiting status, increase tasks version.
338- if hasWaiting {
339- if err := IncreaseTaskVersion (ctx , run .OwnerID , run .RepoID ); err != nil {
340- return err
341- }
288+ return nil , err
289+ } else if ! has {
290+ return nil , fmt .Errorf ("run with id %d: %w" , id , util .ErrNotExist )
342291 }
343292
344- return committer . Commit ()
293+ return & run , nil
345294}
346295
347296func GetRunByRepoAndID (ctx context.Context , repoID , runID int64 ) (* ActionRun , error ) {
@@ -426,7 +375,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
426375 if err = run .LoadRepo (ctx ); err != nil {
427376 return err
428377 }
429- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
378+ if err := UpdateRepoRunsNumbers (ctx , run .Repo ); err != nil {
430379 return err
431380 }
432381 }
@@ -435,3 +384,55 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
435384}
436385
437386type ActionRunIndex db.ResourceIndex
387+
388+ func ShouldBlockRunByConcurrency (ctx context.Context , actionRun * ActionRun ) (bool , error ) {
389+ if actionRun .ConcurrencyGroup == "" || actionRun .ConcurrencyCancel {
390+ return false , nil
391+ }
392+
393+ concurrentRuns , err := db .Find [ActionRun ](ctx , & FindRunOptions {
394+ RepoID : actionRun .RepoID ,
395+ ConcurrencyGroup : actionRun .ConcurrencyGroup ,
396+ Status : []Status {StatusWaiting , StatusRunning },
397+ })
398+ if err != nil {
399+ return false , fmt .Errorf ("find running and waiting runs: %w" , err )
400+ }
401+ previousRuns := slices .DeleteFunc (concurrentRuns , func (r * ActionRun ) bool { return r .ID == actionRun .ID })
402+
403+ return len (previousRuns ) > 0 , nil
404+ }
405+
406+ func CancelPreviousJobsByRunConcurrency (ctx context.Context , actionRun * ActionRun ) ([]* ActionRunJob , error ) {
407+ var cancelledJobs []* ActionRunJob
408+
409+ if actionRun .ConcurrencyGroup != "" && actionRun .ConcurrencyCancel {
410+ // cancel previous runs in the same concurrency group
411+ runs , err := db .Find [ActionRun ](ctx , & FindRunOptions {
412+ RepoID : actionRun .RepoID ,
413+ ConcurrencyGroup : actionRun .ConcurrencyGroup ,
414+ Status : []Status {StatusRunning , StatusWaiting , StatusBlocked },
415+ })
416+ if err != nil {
417+ return cancelledJobs , fmt .Errorf ("find runs: %w" , err )
418+ }
419+ for _ , run := range runs {
420+ if run .ID == actionRun .ID {
421+ continue
422+ }
423+ jobs , err := db .Find [ActionRunJob ](ctx , FindRunJobOptions {
424+ RunID : run .ID ,
425+ })
426+ if err != nil {
427+ return cancelledJobs , fmt .Errorf ("find run %d jobs: %w" , run .ID , err )
428+ }
429+ cjs , err := CancelJobs (ctx , jobs )
430+ if err != nil {
431+ return cancelledJobs , fmt .Errorf ("cancel run %d jobs: %w" , run .ID , err )
432+ }
433+ cancelledJobs = append (cancelledJobs , cjs ... )
434+ }
435+ }
436+
437+ return cancelledJobs , nil
438+ }
0 commit comments