@@ -21,7 +21,6 @@ import (
2121	"code.gitea.io/gitea/modules/util" 
2222	webhook_module "code.gitea.io/gitea/modules/webhook" 
2323
24- 	"github.com/nektos/act/pkg/jobparser" 
2524	"xorm.io/builder" 
2625)
2726
@@ -48,6 +47,8 @@ type ActionRun struct {
4847	TriggerEvent       string                        // the trigger event defined in the `on` configuration of the triggered workflow 
4948	Status             Status                        `xorm:"index"` 
5049	Version            int                           `xorm:"version default 0"`  // Status could be updated concomitantly, so an optimistic lock is needed 
50+ 	ConcurrencyGroup   string                        `xorm:"index"` 
51+ 	ConcurrencyCancel  bool 
5152	// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0 
5253	Started  timeutil.TimeStamp 
5354	Stopped  timeutil.TimeStamp 
@@ -169,7 +170,7 @@ func (run *ActionRun) IsSchedule() bool {
169170	return  run .ScheduleID  >  0 
170171}
171172
172- func  updateRepoRunsNumbers (ctx  context.Context , repo  * repo_model.Repository ) error  {
173+ func  UpdateRepoRunsNumbers (ctx  context.Context , repo  * repo_model.Repository ) error  {
173174	_ , err  :=  db .GetEngine (ctx ).ID (repo .ID ).
174175		NoAutoTime ().
175176		SetExpr ("num_action_runs" ,
@@ -226,121 +227,57 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
226227			return  cancelledJobs , err 
227228		}
228229
229- 		// Iterate over each job and attempt to cancel it. 
230- 		for  _ , job  :=  range  jobs  {
231- 			// Skip jobs that are already in a terminal state (completed, cancelled, etc.). 
232- 			status  :=  job .Status 
233- 			if  status .IsDone () {
234- 				continue 
235- 			}
236- 
237- 			// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. 
238- 			if  job .TaskID  ==  0  {
239- 				job .Status  =  StatusCancelled 
240- 				job .Stopped  =  timeutil .TimeStampNow ()
241- 
242- 				// Update the job's status and stopped time in the database. 
243- 				n , err  :=  UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
244- 				if  err  !=  nil  {
245- 					return  cancelledJobs , err 
246- 				}
247- 
248- 				// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. 
249- 				if  n  ==  0  {
250- 					return  cancelledJobs , errors .New ("job has changed, try again" )
251- 				}
252- 
253- 				cancelledJobs  =  append (cancelledJobs , job )
254- 				// Continue with the next job. 
255- 				continue 
256- 			}
257- 
258- 			// If the job has an associated task, try to stop the task, effectively cancelling the job. 
259- 			if  err  :=  StopTask (ctx , job .TaskID , StatusCancelled ); err  !=  nil  {
260- 				return  cancelledJobs , err 
261- 			}
262- 			cancelledJobs  =  append (cancelledJobs , job )
230+ 		cjs , err  :=  CancelJobs (ctx , jobs )
231+ 		if  err  !=  nil  {
232+ 			return  cancelledJobs , err 
263233		}
234+ 		cancelledJobs  =  append (cancelledJobs , cjs ... )
264235	}
265236
266237	// Return nil to indicate successful cancellation of all running and waiting jobs. 
267238	return  cancelledJobs , nil 
268239}
269240
270- // InsertRun inserts a run 
271- // The title will be cut off at 255 characters if it's longer than 255 characters. 
272- func  InsertRun (ctx  context.Context , run  * ActionRun , jobs  []* jobparser.SingleWorkflow ) error  {
273- 	ctx , committer , err  :=  db .TxContext (ctx )
274- 	if  err  !=  nil  {
275- 		return  err 
276- 	}
277- 	defer  committer .Close ()
278- 
279- 	index , err  :=  db .GetNextResourceIndex (ctx , "action_run_index" , run .RepoID )
280- 	if  err  !=  nil  {
281- 		return  err 
282- 	}
283- 	run .Index  =  index 
284- 	run .Title  =  util .EllipsisDisplayString (run .Title , 255 )
241+ func  CancelJobs (ctx  context.Context , jobs  []* ActionRunJob ) ([]* ActionRunJob , error ) {
242+ 	cancelledJobs  :=  make ([]* ActionRunJob , 0 , len (jobs ))
243+ 	// Iterate over each job and attempt to cancel it. 
244+ 	for  _ , job  :=  range  jobs  {
245+ 		// Skip jobs that are already in a terminal state (completed, cancelled, etc.). 
246+ 		status  :=  job .Status 
247+ 		if  status .IsDone () {
248+ 			continue 
249+ 		}
285250
286- 	if  err  :=  db .Insert (ctx , run ); err  !=  nil  {
287- 		return  err 
288- 	}
251+ 		// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it. 
252+ 		if  job .TaskID  ==  0  {
253+ 			job .Status  =  StatusCancelled 
254+ 			job .Stopped  =  timeutil .TimeStampNow ()
289255
290- 	if  run .Repo  ==  nil  {
291- 		repo , err  :=  repo_model .GetRepositoryByID (ctx , run .RepoID )
292- 		if  err  !=  nil  {
293- 			return  err 
294- 		}
295- 		run .Repo  =  repo 
296- 	}
256+ 			// Update the job's status and stopped time in the database. 
257+ 			n , err  :=  UpdateRunJob (ctx , job , builder.Eq {"task_id" : 0 }, "status" , "stopped" )
258+ 			if  err  !=  nil  {
259+ 				return  cancelledJobs , err 
260+ 			}
297261
298- 	if  err  :=  updateRepoRunsNumbers (ctx , run .Repo ); err  !=  nil  {
299- 		return  err 
300- 	}
262+ 			// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again. 
263+ 			if  n  ==  0  {
264+ 				return  cancelledJobs , errors .New ("job has changed, try again" )
265+ 			}
301266
302- 	runJobs  :=  make ([]* ActionRunJob , 0 , len (jobs ))
303- 	var  hasWaiting  bool 
304- 	for  _ , v  :=  range  jobs  {
305- 		id , job  :=  v .Job ()
306- 		needs  :=  job .Needs ()
307- 		if  err  :=  v .SetJob (id , job .EraseNeeds ()); err  !=  nil  {
308- 			return  err 
309- 		}
310- 		payload , _  :=  v .Marshal ()
311- 		status  :=  StatusWaiting 
312- 		if  len (needs ) >  0  ||  run .NeedApproval  {
313- 			status  =  StatusBlocked 
314- 		} else  {
315- 			hasWaiting  =  true 
267+ 			cancelledJobs  =  append (cancelledJobs , job )
268+ 			// Continue with the next job. 
269+ 			continue 
316270		}
317- 		job .Name  =  util .EllipsisDisplayString (job .Name , 255 )
318- 		runJobs  =  append (runJobs , & ActionRunJob {
319- 			RunID :             run .ID ,
320- 			RepoID :            run .RepoID ,
321- 			OwnerID :           run .OwnerID ,
322- 			CommitSHA :         run .CommitSHA ,
323- 			IsForkPullRequest : run .IsForkPullRequest ,
324- 			Name :              job .Name ,
325- 			WorkflowPayload :   payload ,
326- 			JobID :             id ,
327- 			Needs :             needs ,
328- 			RunsOn :            job .RunsOn (),
329- 			Status :            status ,
330- 		})
331- 	}
332- 	if  err  :=  db .Insert (ctx , runJobs ); err  !=  nil  {
333- 		return  err 
334- 	}
335271
336- 	// if there is a job in the waiting status, increase tasks version. 
337- 	if  hasWaiting  {
338- 		if  err  :=  IncreaseTaskVersion (ctx , run .OwnerID , run .RepoID ); err  !=  nil  {
339- 			return  err 
272+ 		// If the job has an associated task, try to stop the task, effectively cancelling the job. 
273+ 		if  err  :=  StopTask (ctx , job .TaskID , StatusCancelled ); err  !=  nil  {
274+ 			return  cancelledJobs , err 
340275		}
276+ 		cancelledJobs  =  append (cancelledJobs , job )
341277	}
342278
343- 	return  committer .Commit ()
279+ 	// Return nil to indicate successful cancellation of all running and waiting jobs. 
280+ 	return  cancelledJobs , nil 
344281}
345282
346283func  GetRunByID (ctx  context.Context , id  int64 ) (* ActionRun , error ) {
@@ -432,7 +369,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
432369			}
433370			run .Repo  =  repo 
434371		}
435- 		if  err  :=  updateRepoRunsNumbers (ctx , run .Repo ); err  !=  nil  {
372+ 		if  err  :=  UpdateRepoRunsNumbers (ctx , run .Repo ); err  !=  nil  {
436373			return  err 
437374		}
438375	}
@@ -441,3 +378,21 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
441378}
442379
443380type  ActionRunIndex  db.ResourceIndex 
381+ 
382+ func  ShouldBlockRunByConcurrency (ctx  context.Context , actionRun  * ActionRun ) (bool , error ) {
383+ 	if  actionRun .ConcurrencyGroup  ==  ""  ||  actionRun .ConcurrencyCancel  {
384+ 		return  false , nil 
385+ 	}
386+ 
387+ 	concurrentRuns , err  :=  db .Find [ActionRun ](ctx , & FindRunOptions {
388+ 		RepoID :           actionRun .RepoID ,
389+ 		ConcurrencyGroup : actionRun .ConcurrencyGroup ,
390+ 		Status :           []Status {StatusWaiting , StatusRunning },
391+ 	})
392+ 	if  err  !=  nil  {
393+ 		return  false , fmt .Errorf ("find running and waiting runs: %w" , err )
394+ 	}
395+ 	previousRuns  :=  slices .DeleteFunc (concurrentRuns , func (r  * ActionRun ) bool  { return  r .ID  ==  actionRun .ID  })
396+ 
397+ 	return  len (previousRuns ) >  0 , nil 
398+ }
0 commit comments