@@ -20,7 +20,6 @@ import (
2020 "code.gitea.io/gitea/modules/util"
2121 webhook_module "code.gitea.io/gitea/modules/webhook"
2222
23- "github.com/nektos/act/pkg/jobparser"
2423 "xorm.io/builder"
2524)
2625
@@ -47,6 +46,8 @@ type ActionRun struct {
4746 TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
4847 Status Status `xorm:"index"`
4948 Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
49+ ConcurrencyGroup string
50+ ConcurrencyCancel bool
5051 // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5152 Started timeutil.TimeStamp
5253 Stopped timeutil.TimeStamp
@@ -168,7 +169,7 @@ func (run *ActionRun) IsSchedule() bool {
168169 return run .ScheduleID > 0
169170}
170171
171- func updateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
172+ func UpdateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
172173 _ , err := db .GetEngine (ctx ).ID (repo .ID ).
173174 SetExpr ("num_action_runs" ,
174175 builder .Select ("count(*)" ).From ("action_run" ).
@@ -196,13 +197,20 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
196197// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
197198func CancelPreviousJobs (ctx context.Context , repoID int64 , ref , workflowID string , event webhook_module.HookEventType ) error {
198199 // Find all runs in the specified repository, reference, and workflow with non-final status
199- runs , total , err := db . FindAndCount [ ActionRun ]( ctx , FindRunOptions {
200+ opts := & FindRunOptions {
200201 RepoID : repoID ,
201202 Ref : ref ,
202203 WorkflowID : workflowID ,
203204 TriggerEvent : event ,
204205 Status : []Status {StatusRunning , StatusWaiting , StatusBlocked },
205- })
206+ }
207+ return CancelPreviousJobsWithOpts (ctx , opts )
208+ }
209+
210+ // CancelPreviousJobs cancels all previous jobs with opts
211+ func CancelPreviousJobsWithOpts (ctx context.Context , opts * FindRunOptions ) error {
212+ // Find all runs by opts
213+ runs , total , err := db .FindAndCount [ActionRun ](ctx , opts )
206214 if err != nil {
207215 return err
208216 }
@@ -222,119 +230,50 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
222230 return err
223231 }
224232
225- // Iterate over each job and attempt to cancel it.
226- for _ , job := range jobs {
227- // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
228- status := job .Status
229- if status .IsDone () {
230- continue
231- }
232-
233- // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
234- if job .TaskID == 0 {
235- job .Status = StatusCancelled
236- job .Stopped = timeutil .TimeStampNow ()
237-
238- // Update the job's status and stopped time in the database.
239- n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
240- if err != nil {
241- return err
242- }
243-
244- // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
245- if n == 0 {
246- return fmt .Errorf ("job has changed, try again" )
247- }
248-
249- // Continue with the next job.
250- continue
251- }
252-
253- // If the job has an associated task, try to stop the task, effectively cancelling the job.
254- if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
255- return err
256- }
233+ if err := CancelJobs (ctx , jobs ); err != nil {
234+ return err
257235 }
258236 }
259237
260238 // Return nil to indicate successful cancellation of all running and waiting jobs.
261239 return nil
262240}
263241
264- // InsertRun inserts a run
265- // The title will be cut off at 255 characters if it's longer than 255 characters.
266- func InsertRun (ctx context.Context , run * ActionRun , jobs []* jobparser.SingleWorkflow ) error {
267- ctx , committer , err := db .TxContext (ctx )
268- if err != nil {
269- return err
270- }
271- defer committer .Close ()
272-
273- index , err := db .GetNextResourceIndex (ctx , "action_run_index" , run .RepoID )
274- if err != nil {
275- return err
276- }
277- run .Index = index
278- run .Title = util .EllipsisDisplayString (run .Title , 255 )
242+ func CancelJobs (ctx context.Context , jobs []* ActionRunJob ) error {
243+ // Iterate over each job and attempt to cancel it.
244+ for _ , job := range jobs {
245+ // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
246+ status := job .Status
247+ if status .IsDone () {
248+ continue
249+ }
279250
280- if err := db .Insert (ctx , run ); err != nil {
281- return err
282- }
251+ // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
252+ if job .TaskID == 0 {
253+ job .Status = StatusCancelled
254+ job .Stopped = timeutil .TimeStampNow ()
283255
284- if run .Repo == nil {
285- repo , err := repo_model .GetRepositoryByID (ctx , run .RepoID )
286- if err != nil {
287- return err
288- }
289- run .Repo = repo
290- }
256+ // Update the job's status and stopped time in the database.
257+ n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
258+ if err != nil {
259+ return err
260+ }
291261
292- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
293- return err
294- }
262+ // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
263+ if n == 0 {
264+ return fmt .Errorf ("job has changed, try again" )
265+ }
295266
296- runJobs := make ([]* ActionRunJob , 0 , len (jobs ))
297- var hasWaiting bool
298- for _ , v := range jobs {
299- id , job := v .Job ()
300- needs := job .Needs ()
301- if err := v .SetJob (id , job .EraseNeeds ()); err != nil {
302- return err
267+ // Continue with the next job.
268+ continue
303269 }
304- payload , _ := v .Marshal ()
305- status := StatusWaiting
306- if len (needs ) > 0 || run .NeedApproval {
307- status = StatusBlocked
308- } else {
309- hasWaiting = true
310- }
311- job .Name = util .EllipsisDisplayString (job .Name , 255 )
312- runJobs = append (runJobs , & ActionRunJob {
313- RunID : run .ID ,
314- RepoID : run .RepoID ,
315- OwnerID : run .OwnerID ,
316- CommitSHA : run .CommitSHA ,
317- IsForkPullRequest : run .IsForkPullRequest ,
318- Name : job .Name ,
319- WorkflowPayload : payload ,
320- JobID : id ,
321- Needs : needs ,
322- RunsOn : job .RunsOn (),
323- Status : status ,
324- })
325- }
326- if err := db .Insert (ctx , runJobs ); err != nil {
327- return err
328- }
329270
330- // if there is a job in the waiting status, increase tasks version.
331- if hasWaiting {
332- if err := IncreaseTaskVersion (ctx , run .OwnerID , run .RepoID ); err != nil {
271+ // If the job has an associated task, try to stop the task, effectively cancelling the job.
272+ if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
333273 return err
334274 }
335275 }
336-
337- return committer .Commit ()
276+ return nil
338277}
339278
340279func GetRunByID (ctx context.Context , id int64 ) (* ActionRun , error ) {
@@ -426,7 +365,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
426365 }
427366 run .Repo = repo
428367 }
429- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
368+ if err := UpdateRepoRunsNumbers (ctx , run .Repo ); err != nil {
430369 return err
431370 }
432371 }
@@ -435,3 +374,38 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
435374}
436375
437376type ActionRunIndex db.ResourceIndex
377+
378+ func CancelConcurrentJobs (ctx context.Context , actionRunJob * ActionRunJob ) error {
379+ // cancel previous jobs in the same concurrency group
380+ previousJobs , err := db .Find [ActionRunJob ](ctx , FindRunJobOptions {
381+ RepoID : actionRunJob .RepoID ,
382+ ConcurrencyGroup : actionRunJob .ConcurrencyGroup ,
383+ Statuses : []Status {
384+ StatusRunning ,
385+ StatusWaiting ,
386+ StatusBlocked ,
387+ },
388+ })
389+ if err != nil {
390+ return fmt .Errorf ("find previous jobs: %w" , err )
391+ }
392+
393+ return CancelJobs (ctx , previousJobs )
394+ }
395+
396+ func ShouldJobBeBlockedByConcurrentJobs (ctx context.Context , actionRunJob * ActionRunJob ) (bool , error ) {
397+ if actionRunJob .ConcurrencyCancel {
398+ return false , CancelConcurrentJobs (ctx , actionRunJob )
399+ }
400+
401+ concurrentJobsNum , err := db .Count [ActionRunJob ](ctx , FindRunJobOptions {
402+ RepoID : actionRunJob .RepoID ,
403+ ConcurrencyGroup : actionRunJob .ConcurrencyGroup ,
404+ Statuses : []Status {StatusRunning , StatusWaiting },
405+ })
406+ if err != nil {
407+ return false , fmt .Errorf ("count waiting jobs: %w" , err )
408+ }
409+
410+ return concurrentJobsNum > 0 , nil
411+ }
0 commit comments