Skip to content

Commit b012126

Browse files
committed
fix: rerun all jobs did not respect concurrency
1 parent 249e315 commit b012126

File tree

2 files changed

+178
-7
lines changed

2 files changed

+178
-7
lines changed

routers/web/repo/actions/view.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -420,26 +420,45 @@ func Rerun(ctx *context_module.Context) {
420420
return
421421
}
422422

423+
// TODO evaluate concurrency expression again, vars may change after the run is done
424+
// check run (workflow-level) concurrency
425+
426+
job, jobs := getRunJobs(ctx, runIndex, jobIndex)
427+
if ctx.Written() {
428+
return
429+
}
430+
431+
var blockRunByConcurrency bool
432+
423433
// reset run's start and stop time when it is done
424434
if run.Status.IsDone() {
425435
run.PreviousDuration = run.Duration()
426436
run.Started = 0
427437
run.Stopped = 0
428-
if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration"); err != nil {
438+
439+
blockRunByConcurrency, err = actions_model.ShouldBlockRunByConcurrency(ctx, run)
440+
if err != nil {
441+
ctx.ServerError("ShouldBlockRunByConcurrency", err)
442+
return
443+
}
444+
if blockRunByConcurrency {
445+
run.Status = actions_model.StatusBlocked
446+
} else if err := actions_service.CancelJobsByRunConcurrency(ctx, run); err != nil {
447+
ctx.ServerError("cancel jobs", err)
448+
return
449+
} else {
450+
run.Status = actions_model.StatusRunning
451+
}
452+
if err := actions_model.UpdateRun(ctx, run, "started", "stopped", "previous_duration", "status"); err != nil {
429453
ctx.ServerError("UpdateRun", err)
430454
return
431455
}
432456
}
433457

434-
job, jobs := getRunJobs(ctx, runIndex, jobIndex)
435-
if ctx.Written() {
436-
return
437-
}
438-
439458
if jobIndexStr == "" { // rerun all jobs
440459
for _, j := range jobs {
441460
// if the job has needs, it should be set to "blocked" status to wait for other jobs
442-
shouldBlock := len(j.Needs) > 0
461+
shouldBlock := len(j.Needs) > 0 || blockRunByConcurrency
443462
if err := rerunJob(ctx, j, shouldBlock); err != nil {
444463
ctx.ServerError("RerunJob", err)
445464
return
@@ -453,6 +472,7 @@ func Rerun(ctx *context_module.Context) {
453472

454473
for _, j := range rerunJobs {
455474
// jobs other than the specified one should be set to "blocked" status
475+
// TODO respect blockRunByConcurrency here?
456476
shouldBlock := j.JobID != job.JobID
457477
if err := rerunJob(ctx, j, shouldBlock); err != nil {
458478
ctx.ServerError("RerunJob", err)

tests/integration/actions_concurrency_test.go

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,157 @@ jobs:
574574
})
575575
}
576576

577+
func TestWorkflowDispatchRerunAllJobsConcurrency(t *testing.T) {
578+
onGiteaRun(t, func(t *testing.T, u *url.URL) {
579+
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})
580+
session := loginUser(t, user2.Name)
581+
token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser)
582+
583+
apiRepo := createActionsTestRepo(t, token, "actions-concurrency", false)
584+
repo := unittest.AssertExistsAndLoadBean(t, &repo_model.Repository{ID: apiRepo.ID})
585+
httpContext := NewAPITestContext(t, user2.Name, repo.Name, auth_model.AccessTokenScopeWriteRepository)
586+
defer doAPIDeleteRepository(httpContext)(t)
587+
588+
runner := newMockRunner()
589+
runner.registerAsRepoRunner(t, user2.Name, repo.Name, "mock-runner", []string{"ubuntu-latest"}, false)
590+
591+
wf1TreePath := ".gitea/workflows/workflow-dispatch-concurrency.yml"
592+
wf1FileContent := `name: workflow-dispatch-concurrency
593+
on:
594+
workflow_dispatch:
595+
inputs:
596+
appVersion:
597+
description: 'APP version'
598+
required: true
599+
default: 'v1.23'
600+
type: choice
601+
options:
602+
- v1.21
603+
- v1.22
604+
- v1.23
605+
cancel:
606+
description: 'Cancel running workflows'
607+
required: false
608+
type: boolean
609+
default: false
610+
concurrency:
611+
group: workflow-dispatch-${{ inputs.appVersion }}
612+
cancel-in-progress: ${{ inputs.cancel }}
613+
jobs:
614+
job:
615+
runs-on: ubuntu-latest
616+
steps:
617+
- run: echo 'workflow dispatch job'
618+
`
619+
620+
opts1 := getWorkflowCreateFileOptions(user2, repo.DefaultBranch, "create %s"+wf1TreePath, wf1FileContent)
621+
createWorkflowFile(t, token, user2.Name, repo.Name, wf1TreePath, opts1)
622+
623+
// run the workflow with appVersion=v1.21 and cancel=false
624+
urlStr := fmt.Sprintf("/%s/%s/actions/run?workflow=%s", user2.Name, repo.Name, "workflow-dispatch-concurrency.yml")
625+
req := NewRequestWithValues(t, "POST", urlStr, map[string]string{
626+
"_csrf": GetUserCSRFToken(t, session),
627+
"ref": "refs/heads/main",
628+
"appVersion": "v1.21",
629+
})
630+
session.MakeRequest(t, req, http.StatusSeeOther)
631+
task1 := runner.fetchTask(t)
632+
_, _, run1 := getTaskAndJobAndRunByTaskID(t, task1.Id)
633+
assert.Equal(t, "workflow-dispatch-v1.21", run1.ConcurrencyGroup)
634+
635+
req = NewRequestWithValues(t, "POST", urlStr, map[string]string{
636+
"_csrf": GetUserCSRFToken(t, session),
637+
"ref": "refs/heads/main",
638+
"appVersion": "v1.22",
639+
})
640+
session.MakeRequest(t, req, http.StatusSeeOther)
641+
task2 := runner.fetchTask(t)
642+
_, _, run2 := getTaskAndJobAndRunByTaskID(t, task2.Id)
643+
assert.Equal(t, "workflow-dispatch-v1.22", run2.ConcurrencyGroup)
644+
645+
// run the workflow with appVersion=v1.22 and cancel=false again
646+
req = NewRequestWithValues(t, "POST", urlStr, map[string]string{
647+
"_csrf": GetUserCSRFToken(t, session),
648+
"ref": "refs/heads/main",
649+
"appVersion": "v1.22",
650+
})
651+
session.MakeRequest(t, req, http.StatusSeeOther)
652+
653+
runner.fetchNoTask(t) // cannot fetch task because task2 is not completed
654+
655+
// run the workflow with appVersion=v1.22 and cancel=true
656+
req = NewRequestWithValues(t, "POST", urlStr, map[string]string{
657+
"_csrf": GetUserCSRFToken(t, session),
658+
"ref": "refs/heads/main",
659+
"appVersion": "v1.22",
660+
"cancel": "on",
661+
})
662+
session.MakeRequest(t, req, http.StatusSeeOther)
663+
task4 := runner.fetchTask(t)
664+
_, _, run4 := getTaskAndJobAndRunByTaskID(t, task4.Id)
665+
assert.Equal(t, actions_model.StatusRunning, run4.Status)
666+
assert.Equal(t, "workflow-dispatch-v1.22", run4.ConcurrencyGroup)
667+
_, _, run2 = getTaskAndJobAndRunByTaskID(t, task2.Id)
668+
assert.Equal(t, actions_model.StatusCancelled, run2.Status)
669+
670+
runner.execTask(t, task4, &mockTaskOutcome{
671+
result: runnerv1.Result_RESULT_SUCCESS,
672+
})
673+
674+
// rerun cancel true scenario
675+
676+
req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index), map[string]string{
677+
"_csrf": GetUserCSRFToken(t, session),
678+
})
679+
_ = session.MakeRequest(t, req, http.StatusOK)
680+
681+
req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run4.Index), map[string]string{
682+
"_csrf": GetUserCSRFToken(t, session),
683+
})
684+
_ = session.MakeRequest(t, req, http.StatusOK)
685+
686+
task5 := runner.fetchTask(t)
687+
_, _, run4_1 := getTaskAndJobAndRunByTaskID(t, task5.Id)
688+
assert.Equal(t, "workflow-dispatch-v1.22", run4_1.ConcurrencyGroup)
689+
assert.Equal(t, run4.ID, run4_1.ID)
690+
_, _, run2_1 := getTaskAndJobAndRunByTaskID(t, task2.Id)
691+
assert.Equal(t, actions_model.StatusCancelled, run2_1.Status)
692+
693+
runner.execTask(t, task5, &mockTaskOutcome{
694+
result: runnerv1.Result_RESULT_CANCELLED,
695+
})
696+
697+
// rerun cancel false scenario
698+
699+
req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index), map[string]string{
700+
"_csrf": GetUserCSRFToken(t, session),
701+
})
702+
_ = session.MakeRequest(t, req, http.StatusOK)
703+
704+
req = NewRequestWithValues(t, "POST", fmt.Sprintf("/%s/%s/actions/runs/%d/rerun", user2.Name, apiRepo.Name, run2.Index+1), map[string]string{
705+
"_csrf": GetUserCSRFToken(t, session),
706+
})
707+
_ = session.MakeRequest(t, req, http.StatusOK)
708+
709+
task6 := runner.fetchTask(t)
710+
_, _, run2_2 := getTaskAndJobAndRunByTaskID(t, task6.Id)
711+
assert.Equal(t, "workflow-dispatch-v1.22", run2_2.ConcurrencyGroup)
712+
713+
runner.fetchNoTask(t) // cannot fetch task because task2 is not completed
714+
715+
runner.execTask(t, task6, &mockTaskOutcome{
716+
result: runnerv1.Result_RESULT_SUCCESS,
717+
})
718+
719+
task7 := runner.fetchTask(t)
720+
_, _, run3 := getTaskAndJobAndRunByTaskID(t, task7.Id)
721+
assert.Equal(t, "workflow-dispatch-v1.22", run3.ConcurrencyGroup)
722+
runner.execTask(t, task7, &mockTaskOutcome{
723+
result: runnerv1.Result_RESULT_SUCCESS,
724+
})
725+
})
726+
}
727+
577728
func TestScheduleConcurrency(t *testing.T) {
578729
onGiteaRun(t, func(t *testing.T, u *url.URL) {
579730
user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})

0 commit comments

Comments
 (0)