Skip to content

Commit 63eaf15

Browse files
committed
modify cancel-in-progress behavior
1 parent c5444e7 commit 63eaf15

File tree

6 files changed

+127
-92
lines changed

6 files changed

+127
-92
lines changed

models/actions/run.go

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -415,34 +415,40 @@ func ShouldBlockRunByConcurrency(ctx context.Context, actionRun *ActionRun) (boo
415415
}
416416

417417
func CancelPreviousJobsByRunConcurrency(ctx context.Context, actionRun *ActionRun) ([]*ActionRunJob, error) {
418+
if actionRun.ConcurrencyGroup == "" {
419+
return nil, nil
420+
}
421+
418422
var cancelledJobs []*ActionRunJob
419423

420-
if actionRun.ConcurrencyGroup != "" && actionRun.ConcurrencyCancel {
421-
// cancel previous runs in the same concurrency group
422-
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
423-
RepoID: actionRun.RepoID,
424-
ConcurrencyGroup: actionRun.ConcurrencyGroup,
425-
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
424+
statusFindOption := []Status{StatusWaiting, StatusBlocked}
425+
if actionRun.ConcurrencyCancel {
426+
statusFindOption = append(statusFindOption, StatusRunning)
427+
}
428+
// cancel previous runs in the same concurrency group
429+
runs, err := db.Find[ActionRun](ctx, &FindRunOptions{
430+
RepoID: actionRun.RepoID,
431+
ConcurrencyGroup: actionRun.ConcurrencyGroup,
432+
Status: statusFindOption,
433+
})
434+
if err != nil {
435+
return cancelledJobs, fmt.Errorf("find runs: %w", err)
436+
}
437+
for _, run := range runs {
438+
if run.ID == actionRun.ID {
439+
continue
440+
}
441+
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
442+
RunID: run.ID,
426443
})
427444
if err != nil {
428-
return cancelledJobs, fmt.Errorf("find runs: %w", err)
445+
return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err)
429446
}
430-
for _, run := range runs {
431-
if run.ID == actionRun.ID {
432-
continue
433-
}
434-
jobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
435-
RunID: run.ID,
436-
})
437-
if err != nil {
438-
return cancelledJobs, fmt.Errorf("find run %d jobs: %w", run.ID, err)
439-
}
440-
cjs, err := CancelJobs(ctx, jobs)
441-
if err != nil {
442-
return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
443-
}
444-
cancelledJobs = append(cancelledJobs, cjs...)
447+
cjs, err := CancelJobs(ctx, jobs)
448+
if err != nil {
449+
return cancelledJobs, fmt.Errorf("cancel run %d jobs: %w", run.ID, err)
445450
}
451+
cancelledJobs = append(cancelledJobs, cjs...)
446452
}
447453

448454
return cancelledJobs, nil

models/actions/run_job.go

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -239,32 +239,38 @@ func ShouldBlockJobByConcurrency(ctx context.Context, job *ActionRunJob) (bool,
239239
}
240240

241241
func CancelPreviousJobsByJobConcurrency(ctx context.Context, job *ActionRunJob) ([]*ActionRunJob, error) {
242+
if job.RawConcurrencyGroup == "" {
243+
return nil, nil
244+
}
245+
242246
var cancelledJobs []*ActionRunJob
243247

244-
if job.RawConcurrencyGroup != "" {
245-
if !job.IsConcurrencyEvaluated {
246-
return cancelledJobs, ErrUnevaluatedConcurrency{
247-
Group: job.RawConcurrencyGroup,
248-
CancelInProgress: job.RawConcurrencyCancel,
249-
}
248+
if !job.IsConcurrencyEvaluated {
249+
return cancelledJobs, ErrUnevaluatedConcurrency{
250+
Group: job.RawConcurrencyGroup,
251+
CancelInProgress: job.RawConcurrencyCancel,
250252
}
251-
if job.ConcurrencyGroup != "" && job.ConcurrencyCancel {
252-
// cancel previous jobs in the same concurrency group
253-
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
254-
RepoID: job.RepoID,
255-
ConcurrencyGroup: job.ConcurrencyGroup,
256-
Statuses: []Status{StatusRunning, StatusWaiting, StatusBlocked},
257-
})
258-
if err != nil {
259-
return cancelledJobs, fmt.Errorf("find previous jobs: %w", err)
260-
}
261-
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
262-
cjs, err := CancelJobs(ctx, previousJobs)
263-
if err != nil {
264-
return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err)
265-
}
266-
cancelledJobs = append(cancelledJobs, cjs...)
253+
}
254+
if job.ConcurrencyGroup != "" {
255+
statusFindOption := []Status{StatusWaiting, StatusBlocked}
256+
if job.ConcurrencyCancel {
257+
statusFindOption = append(statusFindOption, StatusRunning)
258+
}
259+
// cancel previous jobs in the same concurrency group
260+
previousJobs, err := db.Find[ActionRunJob](ctx, &FindRunJobOptions{
261+
RepoID: job.RepoID,
262+
ConcurrencyGroup: job.ConcurrencyGroup,
263+
Statuses: statusFindOption,
264+
})
265+
if err != nil {
266+
return cancelledJobs, fmt.Errorf("find previous jobs: %w", err)
267+
}
268+
previousJobs = slices.DeleteFunc(previousJobs, func(j *ActionRunJob) bool { return j.ID == job.ID })
269+
cjs, err := CancelJobs(ctx, previousJobs)
270+
if err != nil {
271+
return cancelledJobs, fmt.Errorf("cancel previous jobs: %w", err)
267272
}
273+
cancelledJobs = append(cancelledJobs, cjs...)
268274
}
269275

270276
return cancelledJobs, nil

routers/web/repo/actions/view.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -443,12 +443,13 @@ func Rerun(ctx *context_module.Context) {
443443
}
444444
if blockRunByConcurrency {
445445
run.Status = actions_model.StatusBlocked
446-
} else if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil {
447-
ctx.ServerError("cancel jobs", err)
448-
return
449446
} else {
450447
run.Status = actions_model.StatusRunning
451448
}
449+
if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil {
450+
ctx.ServerError("cancel jobs", err)
451+
return
452+
}
452453
if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status"); err != nil {
453454
ctx.ServerError("UpdateRun", err)
454455
return
@@ -520,7 +521,8 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou
520521
}
521522
if blockByConcurrency {
522523
job.Status = actions_model.StatusBlocked
523-
} else if err := actions_service.CancelJobsByJobConcurrency(ctx, job); err != nil {
524+
}
525+
if err := actions_service.CancelJobsByJobConcurrency(ctx, job); err != nil {
524526
return fmt.Errorf("cancel jobs: %w", err)
525527
}
526528
}

services/actions/job_emitter.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -285,16 +285,13 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
285285
}
286286
}
287287
if allDone {
288-
// check concurrency
289-
blockedByJobConcurrency, err := checkConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars)
290-
if err != nil {
288+
// evaluate concurrency
289+
if err := evaluateConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars); err != nil {
291290
log.Error("Check job %d concurrency: %v. This job will stay blocked.", id, err)
292291
continue
293292
}
294293

295-
if blockedByJobConcurrency {
296-
continue
297-
} else if err := CancelJobsByJobConcurrency(ctx, r.jobMap[id]); err != nil {
294+
if err := CancelJobsByJobConcurrency(ctx, r.jobMap[id]); err != nil {
298295
log.Error("Cancel previous jobs for job %d: %v", id, err)
299296
}
300297

@@ -322,18 +319,18 @@ func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model
322319
return ret
323320
}
324321

325-
func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) {
322+
func evaluateConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) error {
326323
if actionRunJob.RawConcurrencyGroup == "" {
327-
return false, nil
324+
return nil
328325
}
329326
if err := actionRunJob.LoadAttributes(ctx); err != nil {
330-
return false, err
327+
return err
331328
}
332329

333330
if !actionRunJob.IsConcurrencyEvaluated {
334331
taskNeeds, err := FindTaskNeeds(ctx, actionRunJob)
335332
if err != nil {
336-
return false, fmt.Errorf("find task needs: %w", err)
333+
return fmt.Errorf("find task needs: %w", err)
337334
}
338335
jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds))
339336
for jobID, taskNeed := range taskNeeds {
@@ -346,14 +343,14 @@ func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_
346343

347344
actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel, err = EvaluateJobConcurrency(ctx, actionRunJob.Run, actionRunJob, vars, jobResults)
348345
if err != nil {
349-
return false, fmt.Errorf("evaluate job concurrency: %w", err)
346+
return fmt.Errorf("evaluate job concurrency: %w", err)
350347
}
351348
actionRunJob.IsConcurrencyEvaluated = true
352349

353350
if _, err := actions_model.UpdateRunJob(ctx, actionRunJob, nil, "concurrency_group", "concurrency_cancel", "is_concurrency_evaluated"); err != nil {
354-
return false, fmt.Errorf("update run job: %w", err)
351+
return fmt.Errorf("update run job: %w", err)
355352
}
356353
}
357354

358-
return actions_model.ShouldBlockJobByConcurrency(ctx, actionRunJob)
355+
return nil
359356
}

services/actions/run.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
3838
}
3939
if blockRunByConcurrency {
4040
run.Status = actions_model.StatusBlocked
41-
} else if err := CancelJobsByRunConcurrency(ctx, run); err != nil {
41+
}
42+
if err := CancelJobsByRunConcurrency(ctx, run); err != nil {
4243
return fmt.Errorf("cancel jobs: %w", err)
4344
}
4445

@@ -98,22 +99,26 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
9899
if job.RawConcurrency != nil && job.RawConcurrency.Group != "" {
99100
runJob.RawConcurrencyGroup = job.RawConcurrency.Group
100101
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress
101-
// we do not need to evaluate job concurrency if the job is blocked because it will be checked by job emitter
102-
if runJob.Status != actions_model.StatusBlocked {
102+
// do not evaluate job concurrency when it requires `needs`
103+
if len(needs) == 0 {
103104
var err error
104105
runJob.ConcurrencyGroup, runJob.ConcurrencyCancel, err = EvaluateJobConcurrency(ctx, run, runJob, vars, nil)
105106
if err != nil {
106107
return fmt.Errorf("evaluate job concurrency: %w", err)
107108
}
108109
runJob.IsConcurrencyEvaluated = true
110+
}
111+
// do not need to check job concurrency if the job is blocked because it will be checked by job emitter
112+
if runJob.Status != actions_model.StatusBlocked {
109113
// check if the job should be blocked by job concurrency
110114
blockByConcurrency, err := actions_model.ShouldBlockJobByConcurrency(ctx, runJob)
111115
if err != nil {
112116
return err
113117
}
114118
if blockByConcurrency {
115119
runJob.Status = actions_model.StatusBlocked
116-
} else if err := CancelJobsByJobConcurrency(ctx, runJob); err != nil {
120+
}
121+
if err := CancelJobsByJobConcurrency(ctx, runJob); err != nil {
117122
return fmt.Errorf("cancel jobs: %w", err)
118123
}
119124
}

tests/integration/actions_concurrency_test.go

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,7 +1030,7 @@ jobs:
10301030
concurrency:
10311031
group: job-group-1
10321032
steps:
1033-
- run: echo 'wf2-job2'
1033+
- run: echo 'wf2-job1'
10341034
wf2-job2:
10351035
runs-on: runner2
10361036
concurrency:
@@ -1073,70 +1073,89 @@ jobs:
10731073
- run: echo 'wf4-job1'
10741074
`
10751075

1076-
// push workflow 1, 2 and 3
1076+
// push workflow 1
10771077
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent)
10781078
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
1079-
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent)
1080-
createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2)
1081-
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf3TreePath, wf3FileContent)
1082-
createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3)
1079+
10831080
// fetch wf1-job1 and wf1-job2
10841081
w1j1Task := runner1.fetchTask(t)
10851082
w1j2Task := runner2.fetchTask(t)
1086-
// cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1"
1087-
// cannot fetch wf3-job1 because it is blocked by wf1-job1's concurrency group "job-group-1"
1088-
runner1.fetchNoTask(t)
1089-
runner2.fetchNoTask(t)
10901083
_, w1j1Job, w1Run := getTaskAndJobAndRunByTaskID(t, w1j1Task.Id)
10911084
assert.Equal(t, "job-group-1", w1j1Job.ConcurrencyGroup)
10921085
assert.Equal(t, "workflow-group-1", w1Run.ConcurrencyGroup)
10931086
assert.Equal(t, "concurrent-workflow-1.yml", w1Run.WorkflowID)
1087+
assert.Equal(t, actions_model.StatusRunning, w1j1Job.Status)
10941088
_, w1j2Job, _ := getTaskAndJobAndRunByTaskID(t, w1j2Task.Id)
10951089
assert.Equal(t, "job-group-2", w1j2Job.ConcurrencyGroup)
1090+
assert.Equal(t, actions_model.StatusRunning, w1j2Job.Status)
1091+
1092+
// push workflow 2
1093+
opts2 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf2TreePath, wf2FileContent)
1094+
createWorkflowFile(t, token, user2.Name, repo.Name, wf2TreePath, opts2)
1095+
// cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1"
1096+
runner1.fetchNoTask(t)
1097+
runner2.fetchNoTask(t)
1098+
// query wf2-job1 from db and check its status
1099+
w2Run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "concurrent-workflow-2.yml"})
1100+
w2j1Job := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: w2Run.ID, JobID: "wf2-job1"})
1101+
assert.Equal(t, actions_model.StatusBlocked, w2j1Job.Status)
1102+
1103+
// push workflow 3
1104+
opts3 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf3TreePath, wf3FileContent)
1105+
createWorkflowFile(t, token, user2.Name, repo.Name, wf3TreePath, opts3)
1106+
// cannot fetch wf3-job1 because it is blocked by wf1-job1's concurrency group "job-group-1"
1107+
runner1.fetchNoTask(t)
1108+
// query wf3-job1 from db and check its status
1109+
w3Run := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRun{RepoID: repo.ID, WorkflowID: "concurrent-workflow-3.yml"})
1110+
w3j1Job := unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{RunID: w3Run.ID, JobID: "wf3-job1"})
1111+
assert.Equal(t, actions_model.StatusBlocked, w3j1Job.Status)
1112+
// wf2-job1 is cancelled by wf3-job1
1113+
w2j1Job = unittest.AssertExistsAndLoadBean(t, &actions_model.ActionRunJob{ID: w2j1Job.ID})
1114+
assert.Equal(t, actions_model.StatusCancelled, w2j1Job.Status)
1115+
10961116
// exec wf1-job1
10971117
runner1.execTask(t, w1j1Task, &mockTaskOutcome{
10981118
result: runnerv1.Result_RESULT_SUCCESS,
10991119
})
1120+
11001121
// fetch wf3-job1
1122+
assert.Equal(t, actions_model.StatusBlocked, w3j1Job.Status)
11011123
w3j1Task := runner1.fetchTask(t)
1102-
// cannot fetch wf2-job1 and wf2-job2 because workflow-2 is blocked by workflow-1's concurrency group "workflow-group-1"
1103-
runner1.fetchNoTask(t)
1104-
runner2.fetchNoTask(t)
1105-
_, w3j1Job, w3Run := getTaskAndJobAndRunByTaskID(t, w3j1Task.Id)
1124+
_, w3j1Job, w3Run = getTaskAndJobAndRunByTaskID(t, w3j1Task.Id)
11061125
assert.Equal(t, "job-group-1", w3j1Job.ConcurrencyGroup)
11071126
assert.Equal(t, "workflow-group-2", w3Run.ConcurrencyGroup)
11081127
assert.Equal(t, "concurrent-workflow-3.yml", w3Run.WorkflowID)
1109-
// push workflow-4
1110-
opts4 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf4TreePath, wf4FileContent)
1111-
createWorkflowFile(t, token, user2.Name, repo.Name, wf4TreePath, opts4)
1128+
11121129
// exec wf1-job2
11131130
runner2.execTask(t, w1j2Task, &mockTaskOutcome{
11141131
result: runnerv1.Result_RESULT_SUCCESS,
11151132
})
1116-
// wf2-job2
1133+
1134+
// fetch wf2-job2
11171135
w2j2Task := runner2.fetchTask(t)
1118-
// cannot fetch wf2-job1 because it is blocked by wf3-job1's concurrency group "job-group-1"
1119-
// cannot fetch wf4-job1 because it is blocked by workflow-3's concurrency group "workflow-group-2"
1120-
runner1.fetchNoTask(t)
1121-
runner2.fetchNoTask(t)
11221136
_, w2j2Job, w2Run := getTaskAndJobAndRunByTaskID(t, w2j2Task.Id)
11231137
assert.Equal(t, "job-group-2", w2j2Job.ConcurrencyGroup)
11241138
assert.Equal(t, "workflow-group-1", w2Run.ConcurrencyGroup)
11251139
assert.Equal(t, "concurrent-workflow-2.yml", w2Run.WorkflowID)
1140+
assert.Equal(t, actions_model.StatusRunning, w2j2Job.Status)
1141+
1142+
// push workflow-4
1143+
opts4 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf4TreePath, wf4FileContent)
1144+
createWorkflowFile(t, token, user2.Name, repo.Name, wf4TreePath, opts4)
1145+
// cannot fetch wf4-job1 because it is blocked by workflow-3's concurrency group "workflow-group-2"
1146+
runner2.fetchNoTask(t)
1147+
11261148
// exec wf3-job1
11271149
runner1.execTask(t, w3j1Task, &mockTaskOutcome{
11281150
result: runnerv1.Result_RESULT_SUCCESS,
11291151
})
1130-
// fetch wf2-job1
1131-
w2j1Task := runner1.fetchTask(t)
1152+
11321153
// fetch wf4-job1
11331154
w4j1Task := runner2.fetchTask(t)
11341155
// all tasks have been fetched
11351156
runner1.fetchNoTask(t)
11361157
runner2.fetchNoTask(t)
1137-
_, w2j1Job, _ := getTaskAndJobAndRunByTaskID(t, w2j1Task.Id)
1138-
assert.Equal(t, "job-group-1", w2j1Job.ConcurrencyGroup)
1139-
assert.Equal(t, actions_model.StatusRunning, w2j2Job.Status)
1158+
11401159
_, w2j2Job, w2Run = getTaskAndJobAndRunByTaskID(t, w2j2Task.Id)
11411160
// wf2-job2 is cancelled because wf4-job1's cancel-in-progress is true
11421161
assert.Equal(t, actions_model.StatusCancelled, w2j2Job.Status)

0 commit comments

Comments
 (0)