Skip to content

Commit 796d2c1

Browse files
committed
Use a single RawConcurrency in db
1 parent d5f6c44 commit 796d2c1

File tree

10 files changed

+100
-78
lines changed

10 files changed

+100
-78
lines changed

models/actions/run.go

Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,31 +27,30 @@ import (
2727

2828
// ActionRun represents a run of a workflow file
2929
type ActionRun struct {
30-
ID int64
31-
Title string
32-
RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"`
33-
Repo *repo_model.Repository `xorm:"-"`
34-
OwnerID int64 `xorm:"index"`
35-
WorkflowID string `xorm:"index"` // the name of workflow file
36-
Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository
37-
TriggerUserID int64 `xorm:"index"`
38-
TriggerUser *user_model.User `xorm:"-"`
39-
ScheduleID int64
40-
Ref string `xorm:"index"` // the commit/tag/… that caused the run
41-
IsRefDeleted bool `xorm:"-"`
42-
CommitSHA string
43-
IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow.
44-
NeedApproval bool // may need approval if it's a fork pull request
45-
ApprovedBy int64 `xorm:"index"` // who approved
46-
Event webhook_module.HookEventType // the webhook event that causes the workflow to run
47-
EventPayload string `xorm:"LONGTEXT"`
48-
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
49-
Status Status `xorm:"index"`
50-
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
51-
RawConcurrencyGroup string
52-
RawConcurrencyCancel string
53-
ConcurrencyGroup string `xorm:"index(repo_concurrency)"`
54-
ConcurrencyCancel bool
30+
ID int64
31+
Title string
32+
RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"`
33+
Repo *repo_model.Repository `xorm:"-"`
34+
OwnerID int64 `xorm:"index"`
35+
WorkflowID string `xorm:"index"` // the name of workflow file
36+
Index int64 `xorm:"index unique(repo_index)"` // a unique number for each run of a repository
37+
TriggerUserID int64 `xorm:"index"`
38+
TriggerUser *user_model.User `xorm:"-"`
39+
ScheduleID int64
40+
Ref string `xorm:"index"` // the commit/tag/… that caused the run
41+
IsRefDeleted bool `xorm:"-"`
42+
CommitSHA string
43+
IsForkPullRequest bool // If this is triggered by a PR from a forked repository or an untrusted user, we need to check if it is approved and limit permissions when running the workflow.
44+
NeedApproval bool // may need approval if it's a fork pull request
45+
ApprovedBy int64 `xorm:"index"` // who approved
46+
Event webhook_module.HookEventType // the webhook event that causes the workflow to run
47+
EventPayload string `xorm:"LONGTEXT"`
48+
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
49+
Status Status `xorm:"index"`
50+
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
51+
RawConcurrency string // raw concurrency
52+
ConcurrencyGroup string `xorm:"index(repo_concurrency)"`
53+
ConcurrencyCancel bool
5554
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
5655
Started timeutil.TimeStamp
5756
Stopped timeutil.TimeStamp

models/actions/run_job.go

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,8 @@ type ActionRunJob struct {
3636
TaskID int64 // the latest task of the job
3737
Status Status `xorm:"index"`
3838

39-
RawConcurrencyGroup string // raw concurrency.group
40-
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
41-
IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty
39+
RawConcurrency string // raw concurrency
40+
IsConcurrencyEvaluated bool // whether RawConcurrency has been evaluated, only valid when RawConcurrency is not empty
4241
ConcurrencyGroup string `xorm:"index(repo_concurrency)"` // evaluated concurrency.group
4342
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress
4443

@@ -206,13 +205,12 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
206205
}
207206

208207
func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool, error) {
209-
if job.RawConcurrencyGroup == "" {
208+
if job.RawConcurrency == "" {
210209
return false, nil
211210
}
212211
if !job.IsConcurrencyEvaluated {
213212
return false, ErrUnevaluatedConcurrency{
214-
Group: job.RawConcurrencyGroup,
215-
CancelInProgress: job.RawConcurrencyCancel,
213+
RawConcurrency: job.RawConcurrency,
216214
}
217215
}
218216
if job.ConcurrencyGroup == "" || job.ConcurrencyCancel {
@@ -228,16 +226,15 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool,
228226
}
229227

230228
func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) {
231-
if job.RawConcurrencyGroup == "" {
229+
if job.RawConcurrency == "" {
232230
return nil, nil
233231
}
234232

235233
var jobsToCancel []*ActionRunJob
236234

237235
if !job.IsConcurrencyEvaluated {
238236
return nil, ErrUnevaluatedConcurrency{
239-
Group: job.RawConcurrencyGroup,
240-
CancelInProgress: job.RawConcurrencyCancel,
237+
RawConcurrency: job.RawConcurrency,
241238
}
242239
}
243240
if job.ConcurrencyGroup == "" {
@@ -270,8 +267,7 @@ func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob)
270267
}
271268

272269
type ErrUnevaluatedConcurrency struct {
273-
Group string
274-
CancelInProgress string
270+
RawConcurrency string
275271
}
276272

277273
func IsErrUnevaluatedConcurrency(err error) bool {
@@ -280,5 +276,5 @@ func IsErrUnevaluatedConcurrency(err error) bool {
280276
}
281277

282278
func (err ErrUnevaluatedConcurrency) Error() string {
283-
return fmt.Sprintf("the raw concurrency [group=%s, cancel-in-progress=%s] is not evaluated", err.Group, err.CancelInProgress)
279+
return fmt.Sprintf("the raw concurrency [%s] is not evaluated", err.RawConcurrency)
284280
}

models/migrations/v1_25/v322.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ import (
99

1010
func AddActionsConcurrency(x *xorm.Engine) error {
1111
type ActionRun struct {
12-
RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"`
13-
RawConcurrencyGroup string
14-
RawConcurrencyCancel string
15-
ConcurrencyGroup string `xorm:"index(repo_concurrency)"`
16-
ConcurrencyCancel bool
12+
RepoID int64 `xorm:"index unique(repo_index) index(repo_concurrency)"`
13+
RawConcurrency string
14+
ConcurrencyGroup string `xorm:"index(repo_concurrency)"`
15+
ConcurrencyCancel bool
1716
}
1817

1918
if err := x.Sync(new(ActionRun)); err != nil {
@@ -22,8 +21,7 @@ func AddActionsConcurrency(x *xorm.Engine) error {
2221

2322
type ActionRunJob struct {
2423
RepoID int64 `xorm:"index index(repo_concurrency)"`
25-
RawConcurrencyGroup string
26-
RawConcurrencyCancel string
24+
RawConcurrency string
2725
IsConcurrencyEvaluated bool
2826
ConcurrencyGroup string `xorm:"index(repo_concurrency)"`
2927
ConcurrencyCancel bool

routers/web/repo/actions/view.go

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
actions_service "code.gitea.io/gitea/services/actions"
3434
context_module "code.gitea.io/gitea/services/context"
3535
notify_service "code.gitea.io/gitea/services/notify"
36+
"gopkg.in/yaml.v3"
3637

3738
"github.com/nektos/act/pkg/model"
3839
"xorm.io/builder"
@@ -440,32 +441,36 @@ func Rerun(ctx *context_module.Context) {
440441
return
441442
}
442443

443-
wfConcurrencyGroup, wfConcurrencyCancel, err := actions_service.EvaluateWorkflowConcurrency(ctx, run, &model.RawConcurrency{
444-
Group: run.RawConcurrencyGroup,
445-
CancelInProgress: run.RawConcurrencyCancel,
446-
}, vars)
447-
if err != nil {
448-
ctx.ServerError("EvaluateWorkflowConcurrency", fmt.Errorf("evaluate workflow concurrency: %w", err))
449-
return
450-
}
451-
if wfConcurrencyGroup != "" {
452-
run.ConcurrencyGroup = wfConcurrencyGroup
453-
run.ConcurrencyCancel = wfConcurrencyCancel
454-
}
444+
if run.RawConcurrency != "" {
445+
var rawConcurrency model.RawConcurrency
446+
if err := yaml.Unmarshal([]byte(run.RawConcurrency), &rawConcurrency); err != nil {
447+
ctx.ServerError("UnmarshalRawConcurrency", fmt.Errorf("unmarshal raw concurrency: %w", err))
448+
return
449+
}
450+
wfConcurrencyGroup, wfConcurrencyCancel, err := actions_service.EvaluateWorkflowConcurrency(ctx, run, &rawConcurrency, vars)
451+
if err != nil {
452+
ctx.ServerError("EvaluateWorkflowConcurrency", fmt.Errorf("evaluate workflow concurrency: %w", err))
453+
return
454+
}
455+
if wfConcurrencyGroup != "" {
456+
run.ConcurrencyGroup = wfConcurrencyGroup
457+
run.ConcurrencyCancel = wfConcurrencyCancel
458+
}
455459

456-
blockRunByConcurrency, err = actions_model.ShouldBlockRunByConcurrency(ctx, run)
457-
if err != nil {
458-
ctx.ServerError("ShouldBlockRunByConcurrency", err)
459-
return
460-
}
461-
if blockRunByConcurrency {
462-
run.Status = actions_model.StatusBlocked
463-
} else {
464-
run.Status = actions_model.StatusRunning
465-
}
466-
if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil {
467-
ctx.ServerError("cancel jobs", err)
468-
return
460+
blockRunByConcurrency, err = actions_model.ShouldBlockRunByConcurrency(ctx, run)
461+
if err != nil {
462+
ctx.ServerError("ShouldBlockRunByConcurrency", err)
463+
return
464+
}
465+
if blockRunByConcurrency {
466+
run.Status = actions_model.StatusBlocked
467+
} else {
468+
run.Status = actions_model.StatusRunning
469+
}
470+
if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil {
471+
ctx.ServerError("cancel jobs", err)
472+
return
473+
}
469474
}
470475
if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status", "concurrency_group", "concurrency_cancel"); err != nil {
471476
ctx.ServerError("UpdateRun", err)
@@ -524,7 +529,7 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou
524529
if err != nil {
525530
return fmt.Errorf("get run %d variables: %w", job.Run.ID, err)
526531
}
527-
if job.RawConcurrencyGroup != "" && job.Status != actions_model.StatusBlocked {
532+
if job.RawConcurrency != "" && job.Status != actions_model.StatusBlocked {
528533
var err error
529534
job.ConcurrencyGroup, job.ConcurrencyCancel, err = actions_service.EvaluateJobConcurrency(ctx, job.Run, job, vars, nil)
530535
if err != nil {

services/actions/concurrency.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
actions_model "code.gitea.io/gitea/models/actions"
1212
"code.gitea.io/gitea/modules/json"
1313
api "code.gitea.io/gitea/modules/structs"
14+
"gopkg.in/yaml.v3"
1415

1516
"github.com/nektos/act/pkg/jobparser"
1617
act_model "github.com/nektos/act/pkg/model"
@@ -41,9 +42,9 @@ func EvaluateJobConcurrency(ctx context.Context, run *actions_model.ActionRun, a
4142
return "", false, fmt.Errorf("job LoadAttributes: %w", err)
4243
}
4344

44-
rawConcurrency := &act_model.RawConcurrency{
45-
Group: actionRunJob.RawConcurrencyGroup,
46-
CancelInProgress: actionRunJob.RawConcurrencyCancel,
45+
var rawConcurrency act_model.RawConcurrency
46+
if err := yaml.Unmarshal([]byte(actionRunJob.RawConcurrency), &rawConcurrency); err != nil {
47+
return "", false, fmt.Errorf("unmarshal raw concurrency: %w", err)
4748
}
4849

4950
gitCtx := GenerateGiteaContext(run, actionRunJob)
@@ -66,7 +67,7 @@ func EvaluateJobConcurrency(ctx context.Context, run *actions_model.ActionRun, a
6667
}
6768
_, singleWorkflowJob := singleWorkflows[0].Job()
6869

69-
concurrencyGroup, concurrencyCancel, err := jobparser.EvaluateConcurrency(rawConcurrency, actionRunJob.JobID, singleWorkflowJob, gitCtx, jobResults, vars, inputs)
70+
concurrencyGroup, concurrencyCancel, err := jobparser.EvaluateConcurrency(&rawConcurrency, actionRunJob.JobID, singleWorkflowJob, gitCtx, jobResults, vars, inputs)
7071
if err != nil {
7172
return "", false, fmt.Errorf("evaluate concurrency: %w", err)
7273
}

services/actions/job_emitter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
326326
}
327327

328328
func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) {
329-
if actionRunJob.RawConcurrencyGroup == "" {
329+
if actionRunJob.RawConcurrency == "" {
330330
return false, nil
331331
}
332332
if err := actionRunJob.LoadAttributes(ctx); err != nil {

services/actions/notifier_helper.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
webhook_module "code.gitea.io/gitea/modules/webhook"
2929
"code.gitea.io/gitea/services/convert"
3030
notify_service "code.gitea.io/gitea/services/notify"
31+
"gopkg.in/yaml.v3"
3132

3233
"github.com/nektos/act/pkg/jobparser"
3334
"github.com/nektos/act/pkg/model"
@@ -363,6 +364,12 @@ func handleWorkflows(
363364
continue
364365
}
365366
if wfRawConcurrency != nil {
367+
rawConcurrency, err := yaml.Marshal(wfRawConcurrency)
368+
if err != nil {
369+
log.Error("Marshal raw concurrency: %v", err)
370+
continue
371+
}
372+
run.RawConcurrency = string(rawConcurrency)
366373
wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars)
367374
if err != nil {
368375
log.Error("EvaluateWorkflowConcurrency: %v", err)

services/actions/run.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"code.gitea.io/gitea/models/db"
1212
repo_model "code.gitea.io/gitea/models/repo"
1313
"code.gitea.io/gitea/modules/util"
14+
"gopkg.in/yaml.v3"
1415

1516
"github.com/nektos/act/pkg/jobparser"
1617
)
@@ -90,9 +91,12 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
9091
Status: status,
9192
}
9293
// check job concurrency
93-
if job.RawConcurrency != nil && job.RawConcurrency.Group != "" {
94-
runJob.RawConcurrencyGroup = job.RawConcurrency.Group
95-
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress
94+
if job.RawConcurrency != nil {
95+
rawConcurrency, err := yaml.Marshal(job.RawConcurrency)
96+
if err != nil {
97+
return fmt.Errorf("marshal raw concurrency: %w", err)
98+
}
99+
runJob.RawConcurrency = string(rawConcurrency)
96100
// do not evaluate job concurrency when it requires `needs`
97101
if len(needs) == 0 {
98102
var err error

services/actions/schedule_tasks.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"code.gitea.io/gitea/modules/timeutil"
1717
webhook_module "code.gitea.io/gitea/modules/webhook"
1818
notify_service "code.gitea.io/gitea/services/notify"
19+
"gopkg.in/yaml.v3"
1920

2021
"github.com/nektos/act/pkg/jobparser"
2122
)
@@ -135,6 +136,11 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
135136
return err
136137
}
137138
if wfRawConcurrency != nil {
139+
rawConcurrency, err := yaml.Marshal(wfRawConcurrency)
140+
if err != nil {
141+
return fmt.Errorf("marshal raw concurrency: %w", err)
142+
}
143+
run.RawConcurrency = string(rawConcurrency)
138144
wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars)
139145
if err != nil {
140146
return err

services/actions/workflow.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"code.gitea.io/gitea/services/context"
2424
"code.gitea.io/gitea/services/convert"
2525
notify_service "code.gitea.io/gitea/services/notify"
26+
"gopkg.in/yaml.v3"
2627

2728
"github.com/nektos/act/pkg/jobparser"
2829
"github.com/nektos/act/pkg/model"
@@ -194,6 +195,11 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re
194195
if err != nil {
195196
return err
196197
}
198+
rawConcurrency, err := yaml.Marshal(wfRawConcurrency)
199+
if err != nil {
200+
return fmt.Errorf("marshal raw concurrency: %w", err)
201+
}
202+
run.RawConcurrency = string(rawConcurrency)
197203
wfConcurrencyGroup, wfConcurrencyCancel, err := EvaluateWorkflowConcurrency(ctx, run, wfRawConcurrency, vars)
198204
if err != nil {
199205
return err

0 commit comments

Comments
 (0)