Skip to content

Commit 926a67b

Browse files
committed
Merge branch 'main' into lunny/remove_wiki_path_ref
2 parents f8a4cd1 + 40f71bc commit 926a67b

File tree

32 files changed

+2869
-287
lines changed

32 files changed

+2869
-287
lines changed

models/actions/run.go

Lines changed: 110 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,21 @@ import (
1616
user_model "code.gitea.io/gitea/models/user"
1717
"code.gitea.io/gitea/modules/git"
1818
"code.gitea.io/gitea/modules/json"
19+
"code.gitea.io/gitea/modules/log"
1920
"code.gitea.io/gitea/modules/setting"
2021
api "code.gitea.io/gitea/modules/structs"
2122
"code.gitea.io/gitea/modules/timeutil"
2223
"code.gitea.io/gitea/modules/util"
2324
webhook_module "code.gitea.io/gitea/modules/webhook"
2425

25-
"github.com/nektos/act/pkg/jobparser"
2626
"xorm.io/builder"
2727
)
2828

2929
// ActionRun represents a run of a workflow file
3030
type ActionRun struct {
3131
ID int64
3232
Title string
33-
RepoID int64 `xorm:"index unique(repo_index)"`
33+
RepoID int64 `xorm:"unique(repo_index) index(repo_concurrency)"`
3434
Repo *repo_model.Repository `xorm:"-"`
3535
OwnerID int64 `xorm:"index"`
3636
WorkflowID string `xorm:"index"` // the name of workflow file
@@ -49,6 +49,9 @@ type ActionRun struct {
4949
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
5050
Status Status `xorm:"index"`
5151
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
52+
RawConcurrency string // raw concurrency
53+
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"`
54+
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"`
5255
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5356
Started timeutil.TimeStamp
5457
Stopped timeutil.TimeStamp
@@ -102,6 +105,15 @@ func (run *ActionRun) PrettyRef() string {
102105
return refName.ShortName()
103106
}
104107

108+
// RefTooltip return a tooltop of run's ref. For pull request, it's the title of the PR, otherwise it's the ShortName.
109+
func (run *ActionRun) RefTooltip() string {
110+
payload, err := run.GetPullRequestEventPayload()
111+
if err == nil && payload != nil && payload.PullRequest != nil {
112+
return payload.PullRequest.Title
113+
}
114+
return git.RefName(run.Ref).ShortName()
115+
}
116+
105117
// LoadAttributes load Repo TriggerUser if not loaded
106118
func (run *ActionRun) LoadAttributes(ctx context.Context) error {
107119
if run == nil {
@@ -181,7 +193,7 @@ func (run *ActionRun) IsSchedule() bool {
181193
return run.ScheduleID > 0
182194
}
183195

184-
func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
196+
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
185197
_, err := db.GetEngine(ctx).ID(repo.ID).
186198
NoAutoTime().
187199
SetExpr("num_action_runs",
@@ -238,116 +250,62 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
238250
return cancelledJobs, err
239251
}
240252

241-
// Iterate over each job and attempt to cancel it.
242-
for _, job := range jobs {
243-
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
244-
status := job.Status
245-
if status.IsDone() {
246-
continue
247-
}
248-
249-
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
250-
if job.TaskID == 0 {
251-
job.Status = StatusCancelled
252-
job.Stopped = timeutil.TimeStampNow()
253-
254-
// Update the job's status and stopped time in the database.
255-
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
256-
if err != nil {
257-
return cancelledJobs, err
258-
}
259-
260-
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
261-
if n == 0 {
262-
return cancelledJobs, errors.New("job has changed, try again")
263-
}
264-
265-
cancelledJobs = append(cancelledJobs, job)
266-
// Continue with the next job.
267-
continue
268-
}
269-
270-
// If the job has an associated task, try to stop the task, effectively cancelling the job.
271-
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
272-
return cancelledJobs, err
273-
}
274-
cancelledJobs = append(cancelledJobs, job)
253+
cjs, err := CancelJobs(ctx, jobs)
254+
if err != nil {
255+
return cancelledJobs, err
275256
}
257+
cancelledJobs = append(cancelledJobs, cjs...)
276258
}
277259

278260
// Return nil to indicate successful cancellation of all running and waiting jobs.
279261
return cancelledJobs, nil
280262
}
281263

282-
// InsertRun inserts a run
283-
// The title will be cut off at 255 characters if it's longer than 255 characters.
284-
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
285-
return db.WithTx(ctx, func(ctx context.Context) error {
286-
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
287-
if err != nil {
288-
return err
264+
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) ([]*ActionRunJob, error) {
265+
cancelledJobs := make([]*ActionRunJob, 0, len(jobs))
266+
// Iterate over each job and attempt to cancel it.
267+
for _, job := range jobs {
268+
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
269+
status := job.Status
270+
if status.IsDone() {
271+
continue
289272
}
290-
run.Index = index
291-
run.Title = util.EllipsisDisplayString(run.Title, 255)
292273

293-
if err := db.Insert(ctx, run); err != nil {
294-
return err
295-
}
274+
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
275+
if job.TaskID == 0 {
276+
job.Status = StatusCancelled
277+
job.Stopped = timeutil.TimeStampNow()
296278

297-
if run.Repo == nil {
298-
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
279+
// Update the job's status and stopped time in the database.
280+
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
299281
if err != nil {
300-
return err
282+
return cancelledJobs, err
301283
}
302-
run.Repo = repo
303-
}
304284

305-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
306-
return err
285+
// If the update affected 0 rows, it means the job has changed in the meantime
286+
if n == 0 {
287+
log.Error("Failed to cancel job %d because it has changed", job.ID)
288+
continue
289+
}
290+
291+
cancelledJobs = append(cancelledJobs, job)
292+
// Continue with the next job.
293+
continue
307294
}
308295

309-
runJobs := make([]*ActionRunJob, 0, len(jobs))
310-
var hasWaiting bool
311-
for _, v := range jobs {
312-
id, job := v.Job()
313-
needs := job.Needs()
314-
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
315-
return err
316-
}
317-
payload, _ := v.Marshal()
318-
status := StatusWaiting
319-
if len(needs) > 0 || run.NeedApproval {
320-
status = StatusBlocked
321-
} else {
322-
hasWaiting = true
323-
}
324-
job.Name = util.EllipsisDisplayString(job.Name, 255)
325-
runJobs = append(runJobs, &ActionRunJob{
326-
RunID: run.ID,
327-
RepoID: run.RepoID,
328-
OwnerID: run.OwnerID,
329-
CommitSHA: run.CommitSHA,
330-
IsForkPullRequest: run.IsForkPullRequest,
331-
Name: job.Name,
332-
WorkflowPayload: payload,
333-
JobID: id,
334-
Needs: needs,
335-
RunsOn: job.RunsOn(),
336-
Status: status,
337-
})
296+
// If the job has an associated task, try to stop the task, effectively cancelling the job.
297+
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
298+
return cancelledJobs, err
338299
}
339-
if err := db.Insert(ctx, runJobs); err != nil {
340-
return err
300+
updatedJob, err := GetRunJobByID(ctx, job.ID)
301+
if err != nil {
302+
return cancelledJobs, fmt.Errorf("get job: %w", err)
341303
}
304+
cancelledJobs = append(cancelledJobs, updatedJob)
305+
}
342306

343-
// if there is a job in the waiting status, increase tasks version.
344-
if hasWaiting {
345-
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
346-
return err
347-
}
348-
}
349-
return nil
350-
})
307+
// Return nil to indicate successful cancellation of all running and waiting jobs.
308+
return cancelledJobs, nil
351309
}
352310

353311
func GetRunByRepoAndID(ctx context.Context, repoID, runID int64) (*ActionRun, error) {
@@ -432,7 +390,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
432390
if err = run.LoadRepo(ctx); err != nil {
433391
return err
434392
}
435-
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
393+
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
436394
return err
437395
}
438396
}
@@ -441,3 +399,59 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
441399
}
442400

443401
type ActionRunIndex db.ResourceIndex
402+
403+
func GetConcurrentRunsAndJobs(ctx context.Context, repoID int64, concurrencyGroup string, status []Status) ([]*ActionRun, []*ActionRunJob, error) {
404+
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
405+
RepoID: repoID,
406+
ConcurrencyGroup: concurrencyGroup,
407+
Status: status,
408+
})
409+
if err != nil {
410+
return nil, nil, fmt.Errorf("find runs: %w", err)
411+
}
412+
413+
jobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
414+
RepoID: repoID,
415+
ConcurrencyGroup: concurrencyGroup,
416+
Statuses: status,
417+
})
418+
if err != nil {
419+
return nil, nil, fmt.Errorf("find jobs: %w", err)
420+
}
421+
422+
return runs, jobs, nil
423+
}
424+
425+
func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) {
426+
if actionRun.ConcurrencyGroup == "" {
427+
return nil, nil
428+
}
429+
430+
var jobsToCancel []*ActionRunJob
431+
432+
statusFindOption := []Status{StatusWaiting, StatusBlocked}
433+
if actionRun.ConcurrencyCancel {
434+
statusFindOption = append(statusFindOption, StatusRunning)
435+
}
436+
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, actionRun.RepoID, actionRun.ConcurrencyGroup, statusFindOption)
437+
if err != nil {
438+
return nil, fmt.Errorf("find concurrent runs and jobs: %w", err)
439+
}
440+
jobsToCancel = append(jobsToCancel, jobs...)
441+
442+
// cancel runs in the same concurrency group
443+
for _, run := range runs {
444+
if run.ID == actionRun.ID {
445+
continue
446+
}
447+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
448+
RunID: run.ID,
449+
})
450+
if err != nil {
451+
return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
452+
}
453+
jobsToCancel = append(jobsToCancel, jobs...)
454+
}
455+
456+
return CancelJobs(ctx, jobsToCancel)
457+
}

models/actions/run_job.go

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,38 @@ type ActionRunJob struct {
2222
ID int64
2323
RunID int64 `xorm:"index"`
2424
Run *ActionRun `xorm:"-"`
25-
RepoID int64 `xorm:"index"`
25+
RepoID int64 `xorm:"index(repo_concurrency)"`
2626
Repo *repo_model.Repository `xorm:"-"`
2727
OwnerID int64 `xorm:"index"`
2828
CommitSHA string `xorm:"index"`
2929
IsForkPullRequest bool
3030
Name string `xorm:"VARCHAR(255)"`
3131
Attempt int64
32-
WorkflowPayload []byte
33-
JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id
34-
Needs []string `xorm:"JSON TEXT"`
35-
RunsOn []string `xorm:"JSON TEXT"`
36-
TaskID int64 // the latest task of the job
37-
Status Status `xorm:"index"`
38-
Started timeutil.TimeStamp
39-
Stopped timeutil.TimeStamp
40-
Created timeutil.TimeStamp `xorm:"created"`
41-
Updated timeutil.TimeStamp `xorm:"updated index"`
32+
33+
// WorkflowPayload is act/jobparser.SingleWorkflow for act/jobparser.Parse
34+
// it should contain exactly one job with global workflow fields for this model
35+
WorkflowPayload []byte
36+
37+
JobID string `xorm:"VARCHAR(255)"` // job id in workflow, not job's id
38+
Needs []string `xorm:"JSON TEXT"`
39+
RunsOn []string `xorm:"JSON TEXT"`
40+
TaskID int64 // the latest task of the job
41+
Status Status `xorm:"index"`
42+
43+
RawConcurrency string // raw concurrency from job YAML's "concurrency" section
44+
45+
// IsConcurrencyEvaluated is only valid/needed when this job's RawConcurrency is not empty.
46+
// If RawConcurrency can't be evaluated (e.g. depend on other job's outputs or have errors), this field will be false.
47+
// If RawConcurrency has been successfully evaluated, this field will be true, ConcurrencyGroup and ConcurrencyCancel are also set.
48+
IsConcurrencyEvaluated bool
49+
50+
ConcurrencyGroup string `xorm:"index(repo_concurrency) NOT NULL DEFAULT ''"` // evaluated concurrency.group
51+
ConcurrencyCancel bool `xorm:"NOT NULL DEFAULT FALSE"` // evaluated concurrency.cancel-in-progress
52+
53+
Started timeutil.TimeStamp
54+
Stopped timeutil.TimeStamp
55+
Created timeutil.TimeStamp `xorm:"created"`
56+
Updated timeutil.TimeStamp `xorm:"updated index"`
4257
}
4358

4459
func init() {
@@ -125,7 +140,7 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col
125140
return affected, nil
126141
}
127142

128-
if affected != 0 && slices.Contains(cols, "status") && job.Status.IsWaiting() {
143+
if slices.Contains(cols, "status") && job.Status.IsWaiting() {
129144
// if the status of job changes to waiting again, increase tasks version.
130145
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
131146
return 0, err
@@ -197,3 +212,39 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
197212
return StatusUnknown // it shouldn't happen
198213
}
199214
}
215+
216+
func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) (jobsToCancel []*ActionRunJob, _ error) {
217+
if job.RawConcurrency == "" {
218+
return nil, nil
219+
}
220+
if !job.IsConcurrencyEvaluated {
221+
return nil, nil
222+
}
223+
if job.ConcurrencyGroup == "" {
224+
return nil, nil
225+
}
226+
227+
statusFindOption := []Status{StatusWaiting, StatusBlocked}
228+
if job.ConcurrencyCancel {
229+
statusFindOption = append(statusFindOption, StatusRunning)
230+
}
231+
runs, jobs, err := GetConcurrentRunsAndJobs(ctx, job.RepoID, job.ConcurrencyGroup, statusFindOption)
232+
if err != nil {
233+
return nil, fmt.Errorf("find concurrent runs and jobs: %w", err)
234+
}
235+
jobs = slices.DeleteFunc(jobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
236+
jobsToCancel = append(jobsToCancel, jobs...)
237+
238+
// cancel runs in the same concurrency group
239+
for _, run := range runs {
240+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
241+
RunID: run.ID,
242+
})
243+
if err != nil {
244+
return nil, fmt.Errorf("find run %d jobs: %w", run.ID, err)
245+
}
246+
jobsToCancel = append(jobsToCancel, jobs...)
247+
}
248+
249+
return CancelJobs(ctx, jobsToCancel)
250+
}

0 commit comments

Comments
 (0)