Skip to content

Commit aea9090

Browse files
committed
simplify code
1 parent a7a5842 commit aea9090

File tree

6 files changed

+73
-72
lines changed

6 files changed

+73
-72
lines changed

models/actions/run.go

Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -374,38 +374,3 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
374374
}
375375

376376
type ActionRunIndex db.ResourceIndex
377-
378-
func CancelConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) error {
379-
// cancel previous jobs in the same concurrency group
380-
previousJobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
381-
RepoID: actionRunJob.RepoID,
382-
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
383-
Statuses: []Status{
384-
StatusRunning,
385-
StatusWaiting,
386-
StatusBlocked,
387-
},
388-
})
389-
if err != nil {
390-
return fmt.Errorf("find previous jobs: %w", err)
391-
}
392-
393-
return CancelJobs(ctx, previousJobs)
394-
}
395-
396-
func ShouldJobBeBlockedByConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) (bool, error) {
397-
if actionRunJob.ConcurrencyCancel {
398-
return false, CancelConcurrentJobs(ctx, actionRunJob)
399-
}
400-
401-
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
402-
RepoID: actionRunJob.RepoID,
403-
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
404-
Statuses: []Status{StatusRunning, StatusWaiting},
405-
})
406-
if err != nil {
407-
return false, fmt.Errorf("count waiting jobs: %w", err)
408-
}
409-
410-
return concurrentJobsNum > 0, nil
411-
}

models/actions/run_job.go

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@ type ActionRunJob struct {
3434
TaskID int64 // the latest task of the job
3535
Status Status `xorm:"index"`
3636

37-
RawConcurrencyGroup string // raw concurrency.group
38-
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
39-
ConcurrencyGroup string // evaluated concurrency.group
40-
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress
37+
RawConcurrencyGroup string // raw concurrency.group
38+
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
39+
IsConcurrencyEvaluated bool // whether RawConcurrencyGroup have been evaluated, only valid when RawConcurrencyGroup is not empty
40+
ConcurrencyGroup string // evaluated concurrency.group
41+
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress
4142

4243
Started timeutil.TimeStamp
4344
Stopped timeutil.TimeStamp
@@ -190,3 +191,47 @@ func AggregateJobStatus(jobs []*ActionRunJob) Status {
190191
return StatusUnknown // it shouldn't happen
191192
}
192193
}
194+
195+
func ShouldJobBeBlockedByConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) (bool, error) {
196+
if len(actionRunJob.RawConcurrencyGroup) == 0 {
197+
return false, nil
198+
}
199+
if !actionRunJob.IsConcurrencyEvaluated {
200+
return false, fmt.Errorf("the raw concurrency group has not been evaluated")
201+
}
202+
if len(actionRunJob.ConcurrencyGroup) == 0 {
203+
return false, nil
204+
}
205+
if actionRunJob.ConcurrencyCancel {
206+
return false, CancelConcurrentJobs(ctx, actionRunJob)
207+
}
208+
209+
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
210+
RepoID: actionRunJob.RepoID,
211+
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
212+
Statuses: []Status{StatusRunning, StatusWaiting},
213+
})
214+
if err != nil {
215+
return false, fmt.Errorf("count waiting jobs: %w", err)
216+
}
217+
218+
return concurrentJobsNum > 0, nil
219+
}
220+
221+
func CancelConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) error {
222+
// cancel previous jobs in the same concurrency group
223+
previousJobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
224+
RepoID: actionRunJob.RepoID,
225+
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
226+
Statuses: []Status{
227+
StatusRunning,
228+
StatusWaiting,
229+
StatusBlocked,
230+
},
231+
})
232+
if err != nil {
233+
return fmt.Errorf("find previous jobs: %w", err)
234+
}
235+
236+
return CancelJobs(ctx, previousJobs)
237+
}

models/migrations/v1_23/v312.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ func AddActionsConcurrency(x *xorm.Engine) error {
1818
}
1919

2020
type ActionRunJob struct {
21-
RawConcurrencyGroup string
22-
RawConcurrencyCancel string
23-
ConcurrencyGroup string
24-
ConcurrencyCancel bool
21+
RawConcurrencyGroup string
22+
RawConcurrencyCancel string
23+
IsConcurrencyEvaluated bool
24+
ConcurrencyGroup string
25+
ConcurrencyCancel bool
2526
}
2627

2728
return x.Sync(new(ActionRunJob))

routers/web/repo/actions/view.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -560,8 +560,11 @@ func Approve(ctx *context_module.Context) {
560560
return err
561561
}
562562
for _, job := range jobs {
563-
if len(job.Needs) == 0 && job.Status.IsBlocked() {
564-
// TODO: check concurrency
563+
blockedByConcurrency, err := actions_model.ShouldJobBeBlockedByConcurrentJobs(ctx, job)
564+
if err != nil {
565+
return err
566+
}
567+
if len(job.Needs) == 0 && job.Status.IsBlocked() && !blockedByConcurrency {
565568
job.Status = actions_model.StatusWaiting
566569
_, err := actions_model.UpdateRunJob(ctx, job, nil, "status")
567570
if err != nil {

services/actions/job_emitter.go

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_
275275
return false, err
276276
}
277277

278-
if len(actionRunJob.ConcurrencyGroup) == 0 {
278+
if !actionRunJob.IsConcurrencyEvaluated {
279279
// empty concurrency group means the raw concurrency has not been evaluated
280280
task, err := actions_model.GetTaskByID(ctx, actionRunJob.TaskID)
281281
if err != nil {
@@ -300,25 +300,14 @@ func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_
300300
}
301301

302302
if _, err := actions_model.UpdateRunJob(ctx, &actions_model.ActionRunJob{
303-
ID: actionRunJob.ID,
304-
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
305-
ConcurrencyCancel: actionRunJob.ConcurrencyCancel,
303+
ID: actionRunJob.ID,
304+
IsConcurrencyEvaluated: true,
305+
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
306+
ConcurrencyCancel: actionRunJob.ConcurrencyCancel,
306307
}, nil); err != nil {
307308
return false, fmt.Errorf("update run job: %w", err)
308309
}
309310
}
310311

311-
if len(actionRunJob.ConcurrencyGroup) == 0 {
312-
// the job should not be blocked by concurrency if its concurrency group is empty
313-
return false, nil
314-
}
315-
316-
if actionRunJob.ConcurrencyCancel {
317-
if err := actions_model.CancelConcurrentJobs(ctx, actionRunJob); err != nil {
318-
return false, fmt.Errorf("cancel concurrent jobs: %w", err)
319-
}
320-
return false, nil
321-
}
322-
323312
return actions_model.ShouldJobBeBlockedByConcurrentJobs(ctx, actionRunJob)
324313
}

services/actions/run.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -116,23 +116,21 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
116116
if job.RawConcurrency != nil && len(job.RawConcurrency.Group) > 0 {
117117
runJob.RawConcurrencyGroup = job.RawConcurrency.Group
118118
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress
119-
// we do not need to evaluate job concurrency if the job is blocked
120-
// because it will be checked by job emitter
119+
// we do not need to evaluate job concurrency if the job is blocked because it will be checked by job emitter
121120
if runJob.Status != actions_model.StatusBlocked {
122121
var err error
123122
runJob.ConcurrencyGroup, runJob.ConcurrencyCancel, err = evaluateJobConcurrency(run, runJob, vars, map[string]*jobparser.JobResult{})
124123
if err != nil {
125124
return fmt.Errorf("evaluate job concurrency: %w", err)
126125
}
127-
if len(runJob.ConcurrencyGroup) > 0 {
128-
// check if the job should be blocked by job concurrency
129-
shouldBlock, err := actions_model.ShouldJobBeBlockedByConcurrentJobs(ctx, runJob)
130-
if err != nil {
131-
return err
132-
}
133-
if shouldBlock {
134-
runJob.Status = actions_model.StatusBlocked
135-
}
126+
runJob.IsConcurrencyEvaluated = true
127+
// check if the job should be blocked by job concurrency
128+
shouldBlock, err := actions_model.ShouldJobBeBlockedByConcurrentJobs(ctx, runJob)
129+
if err != nil {
130+
return err
131+
}
132+
if shouldBlock {
133+
runJob.Status = actions_model.StatusBlocked
136134
}
137135
}
138136
}

0 commit comments

Comments
 (0)