@@ -21,7 +21,6 @@ import (
2121 "code.gitea.io/gitea/modules/util"
2222 webhook_module "code.gitea.io/gitea/modules/webhook"
2323
24- "github.com/nektos/act/pkg/jobparser"
2524 "xorm.io/builder"
2625)
2726
@@ -48,6 +47,8 @@ type ActionRun struct {
4847 TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
4948 Status Status `xorm:"index"`
5049 Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
50+ ConcurrencyGroup string `xorm:"index"`
51+ ConcurrencyCancel bool
5152 // Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5253 Started timeutil.TimeStamp
5354 Stopped timeutil.TimeStamp
@@ -169,7 +170,7 @@ func (run *ActionRun) IsSchedule() bool {
169170 return run .ScheduleID > 0
170171}
171172
172- func updateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
173+ func UpdateRepoRunsNumbers (ctx context.Context , repo * repo_model.Repository ) error {
173174 _ , err := db .GetEngine (ctx ).ID (repo .ID ).
174175 SetExpr ("num_action_runs" ,
175176 builder .Select ("count(*)" ).From ("action_run" ).
@@ -225,121 +226,57 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
225226 return cancelledJobs , err
226227 }
227228
228- // Iterate over each job and attempt to cancel it.
229- for _ , job := range jobs {
230- // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
231- status := job .Status
232- if status .IsDone () {
233- continue
234- }
235-
236- // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
237- if job .TaskID == 0 {
238- job .Status = StatusCancelled
239- job .Stopped = timeutil .TimeStampNow ()
240-
241- // Update the job's status and stopped time in the database.
242- n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
243- if err != nil {
244- return cancelledJobs , err
245- }
246-
247- // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
248- if n == 0 {
249- return cancelledJobs , errors .New ("job has changed, try again" )
250- }
251-
252- cancelledJobs = append (cancelledJobs , job )
253- // Continue with the next job.
254- continue
255- }
256-
257- // If the job has an associated task, try to stop the task, effectively cancelling the job.
258- if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
259- return cancelledJobs , err
260- }
261- cancelledJobs = append (cancelledJobs , job )
229+ cjs , err := CancelJobs (ctx , jobs )
230+ if err != nil {
231+ return cancelledJobs , err
262232 }
233+ cancelledJobs = append (cancelledJobs , cjs ... )
263234 }
264235
265236 // Return nil to indicate successful cancellation of all running and waiting jobs.
266237 return cancelledJobs , nil
267238}
268239
269- // InsertRun inserts a run
270- // The title will be cut off at 255 characters if it's longer than 255 characters.
271- func InsertRun (ctx context.Context , run * ActionRun , jobs []* jobparser.SingleWorkflow ) error {
272- ctx , committer , err := db .TxContext (ctx )
273- if err != nil {
274- return err
275- }
276- defer committer .Close ()
277-
278- index , err := db .GetNextResourceIndex (ctx , "action_run_index" , run .RepoID )
279- if err != nil {
280- return err
281- }
282- run .Index = index
283- run .Title = util .EllipsisDisplayString (run .Title , 255 )
240+ func CancelJobs (ctx context.Context , jobs []* ActionRunJob ) ([]* ActionRunJob , error ) {
241+ cancelledJobs := make ([]* ActionRunJob , 0 , len (jobs ))
242+ // Iterate over each job and attempt to cancel it.
243+ for _ , job := range jobs {
244+ // Skip jobs that are already in a terminal state (completed, cancelled, etc.).
245+ status := job .Status
246+ if status .IsDone () {
247+ continue
248+ }
284249
285- if err := db .Insert (ctx , run ); err != nil {
286- return err
287- }
250+ // If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
251+ if job .TaskID == 0 {
252+ job .Status = StatusCancelled
253+ job .Stopped = timeutil .TimeStampNow ()
288254
289- if run .Repo == nil {
290- repo , err := repo_model .GetRepositoryByID (ctx , run .RepoID )
291- if err != nil {
292- return err
293- }
294- run .Repo = repo
295- }
255+ // Update the job's status and stopped time in the database.
256+ n , err := UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
257+ if err != nil {
258+ return cancelledJobs , err
259+ }
296260
297- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
298- return err
299- }
261+ // If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
262+ if n == 0 {
263+ return cancelledJobs , errors .New ("job has changed, try again" )
264+ }
300265
301- runJobs := make ([]* ActionRunJob , 0 , len (jobs ))
302- var hasWaiting bool
303- for _ , v := range jobs {
304- id , job := v .Job ()
305- needs := job .Needs ()
306- if err := v .SetJob (id , job .EraseNeeds ()); err != nil {
307- return err
308- }
309- payload , _ := v .Marshal ()
310- status := StatusWaiting
311- if len (needs ) > 0 || run .NeedApproval {
312- status = StatusBlocked
313- } else {
314- hasWaiting = true
266+ cancelledJobs = append (cancelledJobs , job )
267+ // Continue with the next job.
268+ continue
315269 }
316- job .Name = util .EllipsisDisplayString (job .Name , 255 )
317- runJobs = append (runJobs , & ActionRunJob {
318- RunID : run .ID ,
319- RepoID : run .RepoID ,
320- OwnerID : run .OwnerID ,
321- CommitSHA : run .CommitSHA ,
322- IsForkPullRequest : run .IsForkPullRequest ,
323- Name : job .Name ,
324- WorkflowPayload : payload ,
325- JobID : id ,
326- Needs : needs ,
327- RunsOn : job .RunsOn (),
328- Status : status ,
329- })
330- }
331- if err := db .Insert (ctx , runJobs ); err != nil {
332- return err
333- }
334270
335- // if there is a job in the waiting status, increase tasks version.
336- if hasWaiting {
337- if err := IncreaseTaskVersion (ctx , run .OwnerID , run .RepoID ); err != nil {
338- return err
271+ // If the job has an associated task, try to stop the task, effectively cancelling the job.
272+ if err := StopTask (ctx , job .TaskID , StatusCancelled ); err != nil {
273+ return cancelledJobs , err
339274 }
275+ cancelledJobs = append (cancelledJobs , job )
340276 }
341277
342- return committer .Commit ()
278+ // Return nil to indicate successful cancellation of all running and waiting jobs.
279+ return cancelledJobs , nil
343280}
344281
345282func GetRunByID (ctx context.Context , id int64 ) (* ActionRun , error ) {
@@ -431,7 +368,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
431368 }
432369 run .Repo = repo
433370 }
434- if err := updateRepoRunsNumbers (ctx , run .Repo ); err != nil {
371+ if err := UpdateRepoRunsNumbers (ctx , run .Repo ); err != nil {
435372 return err
436373 }
437374 }
@@ -440,3 +377,21 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
440377}
441378
442379type ActionRunIndex db.ResourceIndex
380+
381+ func ShouldBlockRunByConcurrency (ctx context.Context , actionRun * ActionRun ) (bool , error ) {
382+ if actionRun .ConcurrencyGroup == "" || actionRun .ConcurrencyCancel {
383+ return false , nil
384+ }
385+
386+ concurrentRuns , err := db .Find [ActionRun ](ctx , & FindRunOptions {
387+ RepoID : actionRun .RepoID ,
388+ ConcurrencyGroup : actionRun .ConcurrencyGroup ,
389+ Status : []Status {StatusWaiting , StatusRunning },
390+ })
391+ if err != nil {
392+ return false , fmt .Errorf ("find running and waiting runs: %w" , err )
393+ }
394+ previousRuns := slices .DeleteFunc (concurrentRuns , func (r * ActionRun ) bool { return r .ID == actionRun .ID })
395+
396+ return len (previousRuns ) > 0 , nil
397+ }
0 commit comments