Skip to content

Commit 40f71bc

Browse files
Zettat123ChristopherHXwxiaoguang
authored
Support Actions concurrency syntax (go-gitea#32751)
Fix go-gitea#24769 Fix go-gitea#32662 Fix go-gitea#33260 Depends on https://gitea.com/gitea/act/pulls/124 - https://docs.github.com/en/actions/writing-workflows/workflow-syntax-for-github-actions#concurrency ## ⚠️ BREAKING ⚠️ This PR removes the auto-cancellation feature added by go-gitea#25716. Users need to manually add `concurrency` to workflows to control concurrent workflows or jobs. --------- Signed-off-by: Zettat123 <[email protected]> Co-authored-by: Christopher Homberger <[email protected]> Co-authored-by: wxiaoguang <[email protected]>
1 parent 327d0a7 commit 40f71bc

File tree

17 files changed

+2645
-274
lines changed

17 files changed

+2645
-274
lines changed

models/actions/run.go

Lines changed: 101 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,21 @@ import (
1616
user_model "code.gitea.io/gitea/models/user"
1717
"code.gitea.io/gitea/modules/git"
1818
"code.gitea.io/gitea/modules/json"
19+
"code.gitea.io/gitea/modules/log"
1920
"code.gitea.io/gitea/modules/setting"
2021
api "code.gitea.io/gitea/modules/structs"
2122
"code.gitea.io/gitea/modules/timeutil"
2223
"code.gitea.io/gitea/modules/util"
2324
webhook_module "code.gitea.io/gitea/modules/webhook"
2425

25-
"github.com/nektos/act/pkg/jobparser"
2626
"xorm.io/builder"
2727
)
2828

2929
// ActionRun represents a run of a workflow file
3030
type ActionRun struct {
3131
ID int64
3232
Title string
33-
RepoID int64 `xorm:"index unique(repo_index)"`
33+
RepoID int64 `xorm:"unique(repo_index) index(repo_concurrency)"`
3434
Repo *repo_model.Repository `xorm:"-"`
3535
OwnerID int64 `xorm:"index"`
3636
WorkflowID string `xorm:"index"` // the name of workflow file
@@ -49,6 +49,9 @@ type ActionRun struct {
4949
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
5050
Status Status `xorm:"index"`
5151
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
52+
RawConcurrency string // raw concurrency
53+
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"`
54+
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"`
5255
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5356
Started timeutil.TimeStamp
5457
Stopped timeutil.TimeStamp
@@ -190,7 +193,7 @@ func (run *ActionRun) IsSchedule() bool {
190193
return run.ScheduleID > 0
191194
}
192195

193-
func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
196+
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
194197
_, err := db.GetEngine(ctx).ID(repo.ID).
195198
NoAutoTime().
196199
SetExpr("num_action_runs",
@@ -247,116 +250,62 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
247250
return cancelledJobs, err
248251
}
249252

250-
// Iterate over each job and attempt to cancel it.
251-
for _, job := range jobs {
252-
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
253-
status := job.Status
254-
if status.IsDone() {
255-
continue
256-
}
257-
258-
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
259-
if job.TaskID == 0 {
260-
job.Status = StatusCancelled
261-
job.Stopped = timeutil.TimeStampNow()
262-
263-
// Update the job's status and stopped time in the database.
264-
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
265-
if err != nil {
266-
return cancelledJobs, err
267-
}
268-
269-
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
270-
if n == 0 {
271-
return cancelledJobs, errors.New("job has changed, try again")
272-
}
273-
274-
cancelledJobs = append(cancelledJobs, job)
275-
// Continue with the next job.
276-
continue
277-
}
278-
279-
// If the job has an associated task, try to stop the task, effectively cancelling the job.
280-
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
281-
return cancelledJobs, err
282-
}
283-
cancelledJobs = append(cancelledJobs, job)
253+
cjs, err := CancelJobs(ctx, jobs)
254+
if err != nil {
255+
return cancelledJobs, err
284256
}
257+
cancelledJobs = append(cancelledJobs, cjs...)
285258
}
286259

287260
// Return nil to indicate successful cancellation of all running and waiting jobs.
288261
return cancelledJobs, nil
289262
}
290263

291-
// InsertRun inserts a run
292-
// The title will be cut off at 255 characters if it's longer than 255 characters.
293-
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
294-
return db.WithTx(ctx, func(ctx context.Context) error {
295-
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
296-
if err != nil {
297-
return err
264+
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) ([]*ActionRunJob, error) {
265+
cancelledJobs := make([]*ActionRunJob, 0, len(jobs))
266+
// Iterate over each job and attempt to cancel it.
267+
for _, job := range jobs {
268+
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
269+
status := job.Status
270+
if status.IsDone() {
271+
continue
298272
}
299-
run.Index = index
300-
run.Title = util.EllipsisDisplayString(run.Title, 255)
301273

302-
if err := db.Insert(ctx, run); err != nil {
303-
return err
304-
}
274+
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
275+
if job.TaskID == 0 {
276+
job.Status = StatusCancelled
277+
job.Stopped = timeutil.TimeStampNow()
305278

306-
if run.Repo == nil {
307-
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
279+
// Update the job's status and stopped time in the database.
280+
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
308281
if err != nil {
309-
return err
282+
return cancelledJobs, err
310283
}
311-
run.Repo = repo
312-
}
313284

314-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
315-
return err
285+
// If the update affected 0 rows, it means the job has changed in the meantime
286+
if n == 0 {
287+
log.Error("Failed to cancel job %d because it has changed", job.ID)
288+
continue
289+
}
290+
291+
cancelledJobs = append(cancelledJobs, job)
292+
// Continue with the next job.
293+
continue
316294
}
317295

318-
runJobs := make([]*ActionRunJob, 0, len(jobs))
319-
var hasWaiting bool
320-
for _, v := range jobs {
321-
id, job := v.Job()
322-
needs := job.Needs()
323-
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
324-
return err
325-
}
326-
payload, _ := v.Marshal()
327-
status := StatusWaiting
328-
if len(needs) > 0 || run.NeedApproval {
329-
status = StatusBlocked
330-
} else {
331-
hasWaiting = true
332-
}
333-
job.Name = util.EllipsisDisplayString(job.Name, 255)
334-
runJobs = append(runJobs, &ActionRunJob{
335-
RunID: run.ID,
336-
RepoID: run.RepoID,
337-
OwnerID: run.OwnerID,
338-
CommitSHA: run.CommitSHA,
339-
IsForkPullRequest: run.IsForkPullRequest,
340-
Name: job.Name,
341-
WorkflowPayload: payload,
342-
JobID: id,
343-
Needs: needs,
344-
RunsOn: job.RunsOn(),
345-
Status: status,
346-
})
296+
// If the job has an associated task, try to stop the task, effectively cancelling the job.
297+
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
298+
return cancelledJobs, err
347299
}
348-
if err := db.Insert(ctx, runJobs); err != nil {
349-
return err
300+
updatedJob, err := GetRunJobByID(ctx, job.ID)
301+
if err != nil {
302+
return cancelledJobs, fmt.Errorf("get job: %w", err)
350303
}
304+
cancelledJobs = append(cancelledJobs, updatedJob)
305+
}
351306

352-
// if there is a job in the waiting status, increase tasks version.
353-
if hasWaiting {
354-
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
355-
return err
356-
}
357-
}
358-
return nil
359-
})
307+
// Return nil to indicate successful cancellation of all running and waiting jobs.
308+
return cancelledJobs, nil
360309
}
361310

362311
func GetRunByRepoAndID(ctx context.Context, repoID, runID int64) (*ActionRun, error) {
@@ -441,7 +390,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
441390
if err = run.LoadRepo(ctx); err != nil {
442391
return err
443392
}
444-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
393+
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
445394
return err
446395
}
447396
}
@@ -450,3 +399,59 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
450399
}
451400

452401
type ActionRunIndex db.ResourceIndex
402+
403+
func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*ActionRun, []*ActionRunJob, error) {
404+
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
405+
RepoID: repoID,
406+
ConcurrencyGroup: concurrencyGroup,
407+
Status: status,
408+
})
409+
if err != nil {
410+
return nil, nil, fmt.Errorf("find runs: %w", err)
411+
}
412+
413+
jobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
414+
RepoID: repoID,
415+
ConcurrencyGroup: concurrencyGroup,
416+
Statuses: status,
417+
})
418+
if err != nil {
419+
return nil, nil, fmt.Errorf("find jobs: %w", err)
420+
}
421+
422+
return runs, jobs, nil
423+
}
424+
425+
func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) {
426+
if actionRun.ConcurrencyGroup == "" {
427+
return nil, nil
428+
}
429+
430+
var jobsToCancel []*ActionRunJob
431+
432+
statusFindOption := []Status{StatusWaiting, StatusBlocked}
433+
if actionRun.ConcurrencyCancel {
434+
statusFindOption = append(statusFindOption, StatusRunning)
435+
}
436+
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, statusFindOption)
437+
if err != nil {
438+
return nil, fmt.Errorf("find concurrent runs and jobs: %w", err)
439+
}
440+
jobsToCancel = append(jobsToCancel, jobs...)
441+
442+
// cancel runs in the same concurrency group
443+
for _, run := range runs {
444+
if run.ID == actionRun.ID {
445+
continue
446+
}
447+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
448+
RunID: run.ID,
449+
})
450+
if err != nil {
451+
return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
452+
}
453+
jobsToCancel = append(jobsToCancel, jobs...)
454+
}
455+
456+
return CancelJobs(ctx, jobsToCancel)
457+
}

models/actions/run_job.go

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,38 @@ type ActionRunJob struct {
2222
ID int64
2323
RunID int64 `xorm:"index"`
2424
Run *ActionRun `xorm:"-"`
25-
RepoID int64 `xorm:"index"`
25+
RepoID int64 `xorm:"index(repo_concurrency)"`
2626
Repo *repo_model.Repository `xorm:"-"`
2727
OwnerID int64 `xorm:"index"`
2828
CommitSHA string `xorm:"index"`
2929
IsForkPullRequest bool
3030
Name string `xorm:"VARCHAR(255)"`
3131
Attempt int64
32-
WorkflowPayload []byte
33-
JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id
34-
Needs []string `xorm:"JSON TEXT"`
35-
RunsOn []string `xorm:"JSON TEXT"`
36-
TaskID int64 // the latest task of the job
37-
Status Status `xorm:"index"`
38-
Started timeutil.TimeStamp
39-
Stopped timeutil.TimeStamp
40-
Created timeutil.TimeStamp `xorm:"created"`
41-
Updated timeutil.TimeStamp `xorm:"updated index"`
32+
33+
// WorkflowPayload is act/jobparser.SingleWorkflow for act/jobparser.Parse
34+
// it should contain exactly one job with global workflow fields for this model
35+
WorkflowPayload []byte
36+
37+
JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id
38+
Needs []string `xorm:"JSON TEXT"`
39+
RunsOn []string `xorm:"JSON TEXT"`
40+
TaskID int64 // the latest task of the job
41+
Status Status `xorm:"index"`
42+
43+
RawConcurrency string // raw concurrency from job YAML's "concurrency" section
44+
45+
// IsConcurrencyEvaluated is only valid/needed when this job's RawConcurrency is not empty.
46+
// If RawConcurrency can't be evaluated (e.g. depend on other job's outputs or have errors), this field will be false.
47+
// If RawConcurrency has been successfully evaluated, this field will be true, ConcurrencyGroup and ConcurrencyCancel are also set.
48+
IsConcurrencyEvaluated bool
49+
50+
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` // evaluated concurrency.group
51+
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // evaluated concurrency.cancel-in-progress
52+
53+
Started timeutil.TimeStamp
54+
Stopped timeutil.TimeStamp
55+
Created timeutil.TimeStamp `xorm:"created"`
56+
Updated timeutil.TimeStamp `xorm:"updated index"`
4257
}
4358

4459
func init() {
@@ -125,7 +140,7 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col
125140
return affected, nil
126141
}
127142

128-
if affected != 0 && slices.Contains(cols, "status") && job.Status.IsWaiting() {
143+
if slices.Contains(cols, "status") && job.Status.IsWaiting() {
129144
// if the status of job changes to waiting again, increase tasks version.
130145
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
131146
return 0, err
@@ -197,3 +212,39 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
197212
return StatusUnknown // it shouldn't happen
198213
}
199214
}
215+
216+
func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) (jobsToCancel []*ActionRunJob, _ error) {
217+
if job.RawConcurrency == "" {
218+
return nil, nil
219+
}
220+
if !job.IsConcurrencyEvaluated {
221+
return nil, nil
222+
}
223+
if job.ConcurrencyGroup == "" {
224+
return nil, nil
225+
}
226+
227+
statusFindOption := []Status{StatusWaiting, StatusBlocked}
228+
if job.ConcurrencyCancel {
229+
statusFindOption = append(statusFindOption, StatusRunning)
230+
}
231+
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, statusFindOption)
232+
if err != nil {
233+
return nil, fmt.Errorf("find concurrent runs and jobs: %w", err)
234+
}
235+
jobs = slices.DeleteFunc(jobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
236+
jobsToCancel = append(jobsToCancel, jobs...)
237+
238+
// cancel runs in the same concurrency group
239+
for _, run := range runs {
240+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
241+
RunID: run.ID,
242+
})
243+
if err != nil {
244+
return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
245+
}
246+
jobsToCancel = append(jobsToCancel, jobs...)
247+
}
248+
249+
return CancelJobs(ctx, jobsToCancel)
250+
}

models/actions/run_job_list.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,13 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err
6969

7070
type FindRunJobOptions struct {
7171
db.ListOptions
72-
RunID int64
73-
RepoID int64
74-
OwnerID int64
75-
CommitSHA string
76-
Statuses []Status
77-
UpdatedBefore timeutil.TimeStamp
72+
RunID int64
73+
RepoID int64
74+
OwnerID int64
75+
CommitSHA string
76+
Statuses []Status
77+
UpdatedBefore timeutil.TimeStamp
78+
ConcurrencyGroup string
7879
}
7980

8081
func (opts FindRunJobOptions) ToConds() builder.Cond {
@@ -94,6 +95,12 @@ func (opts FindRunJobOptions) ToConds() builder.Cond {
9495
if opts.UpdatedBefore > 0 {
9596
cond = cond.And(builder.Lt{"`action_run_job`.updated": opts.UpdatedBefore})
9697
}
98+
if opts.ConcurrencyGroup != "" {
99+
if opts.RepoID == 0 {
100+
panic("Invalid FindRunJobOptions: repo_id is required")
101+
}
102+
cond = cond.And(builder.Eq{"`action_run_job`.concurrency_group": opts.ConcurrencyGroup})
103+
}
97104
return cond
98105
}
99106

0 commit comments

Comments
 (0)