diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index 52b2e9995e3ba..3422128026ffd 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -429,6 +429,12 @@ func Rerun(ctx *context_module.Context) { ctx.ServerError("UpdateRun", err) return } + + if err := run.LoadAttributes(ctx); err != nil { + ctx.ServerError("run.LoadAttributes", err) + return + } + notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) } job, jobs := getRunJobs(ctx, runIndex, jobIndex) @@ -485,7 +491,6 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou } actions_service.CreateCommitStatus(ctx, job) - actions_service.NotifyWorkflowRunStatusUpdateWithReload(ctx, job) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) return nil @@ -560,9 +565,8 @@ func Cancel(ctx *context_module.Context) { if len(updatedjobs) > 0 { job := updatedjobs[0] actions_service.NotifyWorkflowRunStatusUpdateWithReload(ctx, job) - notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run) } - ctx.JSON(http.StatusOK, struct{}{}) + ctx.JSONOK() } func Approve(ctx *context_module.Context) { @@ -606,7 +610,6 @@ func Approve(ctx *context_module.Context) { if len(updatedjobs) > 0 { job := updatedjobs[0] actions_service.NotifyWorkflowRunStatusUpdateWithReload(ctx, job) - notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run) } for _, job := range updatedjobs { @@ -614,7 +617,7 @@ func Approve(ctx *context_module.Context) { notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) } - ctx.JSON(http.StatusOK, struct{}{}) + ctx.JSONOK() } func Delete(ctx *context_module.Context) { diff --git a/services/actions/clear_tasks.go b/services/actions/clear_tasks.go index 274c04aa57f08..bca38e1af5fa8 100644 --- a/services/actions/clear_tasks.go +++ b/services/actions/clear_tasks.go @@ -42,10 +42,8 @@ func notifyWorkflowJobStatusUpdate(ctx context.Context, jobs []*actions_model.Ac _ = job.LoadAttributes(ctx) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) } - if len(jobs) > 0 { - job := jobs[0] - notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run) - } + job := jobs[0] + notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run) } } @@ -113,6 +111,10 @@ func CancelAbandonedJobs(ctx context.Context) error { } now := timeutil.TimeStampNow() + + // Collect one job per run to send workflow run status update + updatedRuns := map[int64]*actions_model.ActionRunJob{} + for _, job := range jobs { job.Status = actions_model.StatusCancelled job.Stopped = now @@ -127,10 +129,24 @@ func CancelAbandonedJobs(ctx context.Context) error { } CreateCommitStatus(ctx, job) if updated { - NotifyWorkflowRunStatusUpdateWithReload(ctx, job) + updatedRuns[job.RunID] = job notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) } } + for _, job := range updatedRuns { + c, err := db.Count[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{ + RunID: job.RunID, + Statuses: []actions_model.Status{actions_model.StatusWaiting, actions_model.StatusBlocked, actions_model.StatusRunning}, + }) + if err != nil { + log.Error("Count waiting jobs for run %d: %v", job.RunID, err) + continue + } + if c == 0 { + NotifyWorkflowRunStatusUpdateWithReload(ctx, job) + } + } + return nil } diff --git a/services/mailer/mail_workflow_run.go b/services/mailer/mail_workflow_run.go index 29b3abda8ee29..ec6f123139db9 100644 --- a/services/mailer/mail_workflow_run.go +++ b/services/mailer/mail_workflow_run.go @@ -34,6 +34,18 @@ func generateMessageIDForActionsWorkflowRunStatusEmail(repo *repo_model.Reposito } func composeAndSendActionsWorkflowRunStatusEmail(ctx context.Context, repo *repo_model.Repository, run *actions_model.ActionRun, sender *user_model.User, recipients []*user_model.User) { + jobs, err := actions_model.GetRunJobsByRunID(ctx, run.ID) + if err != nil { + log.Error("GetRunJobsByRunID: %v", err) + return + } + for _, job := range jobs { + if !job.Status.IsDone() { + log.Trace("composeAndSendActionsWorkflowRunStatusEmail: A job is not done. Will not compose and send actions email.") + return + } + } + subject := "Run" switch run.Status { case actions_model.StatusFailure: @@ -48,11 +60,6 @@ func composeAndSendActionsWorkflowRunStatusEmail(ctx context.Context, repo *repo messageID := generateMessageIDForActionsWorkflowRunStatusEmail(repo, run) metadataHeaders := generateMetadataHeaders(repo) - jobs, err := actions_model.GetRunJobsByRunID(ctx, run.ID) - if err != nil { - log.Error("GetRunJobsByRunID: %v", err) - return - } sort.SliceStable(jobs, func(i, j int) bool { si, sj := jobs[i].Status, jobs[j].Status /* @@ -116,6 +123,7 @@ func composeAndSendActionsWorkflowRunStatusEmail(ctx context.Context, repo *repo } msgs := make([]*sender_service.Message, 0, len(tos)) for _, rec := range tos { + log.Trace("Sending actions email to %s (UID: %d)", rec.Name, rec.ID) msg := sender_service.NewMessageFrom( rec.Email, displayName, @@ -141,7 +149,7 @@ func MailActionsTrigger(ctx context.Context, sender *user_model.User, repo *repo if setting.MailService == nil { return } - if run.Status.IsSkipped() { + if !run.Status.IsDone() || run.Status.IsSkipped() { return } @@ -160,6 +168,7 @@ func MailActionsTrigger(ctx context.Context, sender *user_model.User, repo *repo } if len(recipients) > 0 { + log.Trace("MailActionsTrigger: Initiate email composition") composeAndSendActionsWorkflowRunStatusEmail(ctx, repo, run, sender, recipients) } } diff --git a/services/mailer/notify.go b/services/mailer/notify.go index c008685e131c2..ae16b2b429a93 100644 --- a/services/mailer/notify.go +++ b/services/mailer/notify.go @@ -208,8 +208,5 @@ func (m *mailNotifier) RepoPendingTransfer(ctx context.Context, doer, newOwner * } func (m *mailNotifier) WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun) { - if !run.Status.IsDone() { - return - } MailActionsTrigger(ctx, sender, repo, run) } diff --git a/tests/integration/repo_webhook_test.go b/tests/integration/repo_webhook_test.go index f1abac8cfa1fb..8ae9d474248ce 100644 --- a/tests/integration/repo_webhook_test.go +++ b/tests/integration/repo_webhook_test.go @@ -12,6 +12,7 @@ import ( "path" "strings" "testing" + "time" auth_model "code.gitea.io/gitea/models/auth" "code.gitea.io/gitea/models/repo" @@ -24,7 +25,9 @@ import ( "code.gitea.io/gitea/modules/json" "code.gitea.io/gitea/modules/setting" api "code.gitea.io/gitea/modules/structs" + "code.gitea.io/gitea/modules/test" webhook_module "code.gitea.io/gitea/modules/webhook" + "code.gitea.io/gitea/services/actions" "code.gitea.io/gitea/tests" runnerv1 "code.gitea.io/actions-proto-go/runner/v1" @@ -1129,6 +1132,26 @@ func Test_WebhookWorkflowRun(t *testing.T) { name: "WorkflowRunDepthLimit", callback: testWebhookWorkflowRunDepthLimit, }, + { + name: "WorkflowRunDuplicateEvents", + callback: testWorkflowRunDuplicateEvents, + }, + { + name: "WorkflowRunEventDuplicateEventsRerun", + callback: testWorkflowRunDuplicateEventsRerun, + }, + { + name: "WorkflowRunDuplicateEventsCancelAbandoned", + callback: func(t *testing.T, webhookData *workflowRunWebhook) { + testWorkflowRunDuplicateEventsCancelAbandoned(t, webhookData, true) + }, + }, + { + name: "WorkflowRunDuplicateEventsCancelAbandoned", + callback: func(t *testing.T, webhookData *workflowRunWebhook) { + testWorkflowRunDuplicateEventsCancelAbandoned(t, webhookData, false) + }, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -1141,6 +1164,400 @@ func Test_WebhookWorkflowRun(t *testing.T) { } } +func testWorkflowRunDuplicateEvents(t *testing.T, webhookData *workflowRunWebhook) { + // 1. create a new webhook with special webhook for repo1 + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, "user2") + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + testAPICreateWebhookForRepo(t, session, "user2", "repo1", webhookData.URL, "workflow_run") + + repo1 := unittest.AssertExistsAndLoadBean(t, &repo.Repository{ID: 1}) + + gitRepo1, err := gitrepo.OpenRepository(t.Context(), repo1) + assert.NoError(t, err) + + // 2.2 trigger the webhooks + + // add workflow file to the repo + // init the workflow + wfTreePath := ".gitea/workflows/push.yml" + wfFileContent := `on: + push: + workflow_dispatch: + +jobs: + test: + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test2: + needs: [test] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test3: + needs: [test, test2] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test4: + needs: [test, test2, test3] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test5: + needs: [test, test2, test4] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test6: + strategy: + matrix: + os: [ubuntu-20.04, ubuntu-22.04, ubuntu-24.04] + needs: [test, test2, test3] + runs-on: ${{ matrix.os }} + steps: + - run: exit 0 + + test7: + needs: test6 + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test8: + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test9: + strategy: + matrix: + os: [ubuntu-20.04, ubuntu-22.04, ubuntu-24.04, ubuntu-25.04, windows-2022, windows-2025, macos-13, macos-14, macos-15] + runs-on: ${{ matrix.os }} + steps: + - run: exit 0 + + test10: + runs-on: ubuntu-latest + steps: + - run: exit 0` + opts := getWorkflowCreateFileOptions(user2, repo1.DefaultBranch, "create "+wfTreePath, wfFileContent) + createWorkflowFile(t, token, "user2", "repo1", wfTreePath, opts) + + commitID, err := gitRepo1.GetBranchCommitID(repo1.DefaultBranch) + assert.NoError(t, err) + + // 3. validate the webhook is triggered + assert.Equal(t, "workflow_run", webhookData.triggeredEvent) + assert.Len(t, webhookData.payloads, 1) + assert.Equal(t, "requested", webhookData.payloads[0].Action) + assert.Equal(t, "queued", webhookData.payloads[0].WorkflowRun.Status) + assert.Equal(t, repo1.DefaultBranch, webhookData.payloads[0].WorkflowRun.HeadBranch) + assert.Equal(t, commitID, webhookData.payloads[0].WorkflowRun.HeadSha) + assert.Equal(t, "repo1", webhookData.payloads[0].Repo.Name) + assert.Equal(t, "user2/repo1", webhookData.payloads[0].Repo.FullName) + + // Call cancel ui api + // Only a web UI API exists for cancelling workflow runs, so use the UI endpoint. + cancelURL := fmt.Sprintf("/user2/repo1/actions/runs/%d/cancel", webhookData.payloads[0].WorkflowRun.RunNumber) + req := NewRequestWithValues(t, "POST", cancelURL, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + session.MakeRequest(t, req, http.StatusOK) + + assert.Len(t, webhookData.payloads, 2) + + // 4. Validate the second webhook payload + assert.Equal(t, "workflow_run", webhookData.triggeredEvent) + assert.Equal(t, "completed", webhookData.payloads[1].Action) + assert.Equal(t, "push", webhookData.payloads[1].WorkflowRun.Event) + assert.Equal(t, "completed", webhookData.payloads[1].WorkflowRun.Status) + assert.Equal(t, repo1.DefaultBranch, webhookData.payloads[1].WorkflowRun.HeadBranch) + assert.Equal(t, commitID, webhookData.payloads[1].WorkflowRun.HeadSha) + assert.Equal(t, "repo1", webhookData.payloads[1].Repo.Name) + assert.Equal(t, "user2/repo1", webhookData.payloads[1].Repo.FullName) +} + +func testWorkflowRunDuplicateEventsRerun(t *testing.T, webhookData *workflowRunWebhook) { + // 1. create a new webhook with special webhook for repo1 + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, "user2") + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + runners := make([]*mockRunner, 2) + for i := range runners { + runners[i] = newMockRunner() + runners[i].registerAsRepoRunner(t, "user2", "repo1", fmt.Sprintf("mock-runner-%d", i), []string{"ubuntu-latest"}, false) + } + + testAPICreateWebhookForRepo(t, session, "user2", "repo1", webhookData.URL, "workflow_run") + + repo1 := unittest.AssertExistsAndLoadBean(t, &repo.Repository{ID: 1}) + + gitRepo1, err := gitrepo.OpenRepository(t.Context(), repo1) + assert.NoError(t, err) + + // 2.2 trigger the webhooks + + // add workflow file to the repo + // init the workflow + wfTreePath := ".gitea/workflows/push.yml" + wfFileContent := `on: + push: + workflow_dispatch: + +jobs: + test: + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test2: + needs: [test] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test3: + needs: [test, test2] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test4: + needs: [test, test2, test3] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test5: + needs: [test, test2, test4] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test6: + strategy: + matrix: + os: [ubuntu-20.04, ubuntu-22.04, ubuntu-24.04] + needs: [test, test2, test3] + runs-on: ${{ matrix.os }} + steps: + - run: exit 0 + + test7: + needs: test6 + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test8: + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test9: + strategy: + matrix: + os: [ubuntu-20.04, ubuntu-22.04, ubuntu-24.04, ubuntu-25.04, windows-2022, windows-2025, macos-13, macos-14, macos-15] + runs-on: ${{ matrix.os }} + steps: + - run: exit 0 + + test10: + runs-on: ubuntu-latest + steps: + - run: exit 0` + opts := getWorkflowCreateFileOptions(user2, repo1.DefaultBranch, "create "+wfTreePath, wfFileContent) + createWorkflowFile(t, token, "user2", "repo1", wfTreePath, opts) + + commitID, err := gitRepo1.GetBranchCommitID(repo1.DefaultBranch) + assert.NoError(t, err) + + // 3. validate the webhook is triggered + assert.Equal(t, "workflow_run", webhookData.triggeredEvent) + assert.Len(t, webhookData.payloads, 1) + assert.Equal(t, "requested", webhookData.payloads[0].Action) + assert.Equal(t, "queued", webhookData.payloads[0].WorkflowRun.Status) + assert.Equal(t, repo1.DefaultBranch, webhookData.payloads[0].WorkflowRun.HeadBranch) + assert.Equal(t, commitID, webhookData.payloads[0].WorkflowRun.HeadSha) + assert.Equal(t, "repo1", webhookData.payloads[0].Repo.Name) + assert.Equal(t, "user2/repo1", webhookData.payloads[0].Repo.FullName) + + tasks := make([]*runnerv1.Task, len(runners)) + for i := range runners { + tasks[i] = runners[i].fetchTask(t) + runners[i].execTask(t, tasks[i], &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + } + + // Call cancel ui api + // Only a web UI API exists for cancelling workflow runs, so use the UI endpoint. + cancelURL := fmt.Sprintf("/user2/repo1/actions/runs/%d/cancel", webhookData.payloads[0].WorkflowRun.RunNumber) + req := NewRequestWithValues(t, "POST", cancelURL, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + session.MakeRequest(t, req, http.StatusOK) + + assert.Len(t, webhookData.payloads, 2) + + // 4. Validate the second webhook payload + assert.Equal(t, "workflow_run", webhookData.triggeredEvent) + assert.Equal(t, "completed", webhookData.payloads[1].Action) + assert.Equal(t, "push", webhookData.payloads[1].WorkflowRun.Event) + assert.Equal(t, "completed", webhookData.payloads[1].WorkflowRun.Status) + assert.Equal(t, repo1.DefaultBranch, webhookData.payloads[1].WorkflowRun.HeadBranch) + assert.Equal(t, commitID, webhookData.payloads[1].WorkflowRun.HeadSha) + assert.Equal(t, "repo1", webhookData.payloads[1].Repo.Name) + assert.Equal(t, "user2/repo1", webhookData.payloads[1].Repo.FullName) + + // Call rerun ui api + // Only a web UI API exists for cancelling workflow runs, so use the UI endpoint. + rerunURL := fmt.Sprintf("/user2/repo1/actions/runs/%d/rerun", webhookData.payloads[0].WorkflowRun.RunNumber) + req = NewRequestWithValues(t, "POST", rerunURL, map[string]string{ + "_csrf": GetUserCSRFToken(t, session), + }) + session.MakeRequest(t, req, http.StatusOK) + + assert.Len(t, webhookData.payloads, 3) +} + +func testWorkflowRunDuplicateEventsCancelAbandoned(t *testing.T, webhookData *workflowRunWebhook, partiallyAbandoned bool) { + // 1. create a new webhook with special webhook for repo1 + user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}) + session := loginUser(t, "user2") + token := getTokenForLoggedInUser(t, session, auth_model.AccessTokenScopeWriteRepository, auth_model.AccessTokenScopeWriteUser) + + runners := make([]*mockRunner, 2) + for i := range runners { + runners[i] = newMockRunner() + runners[i].registerAsRepoRunner(t, "user2", "repo1", fmt.Sprintf("mock-runner-%d", i), []string{"ubuntu-latest"}, false) + } + + testAPICreateWebhookForRepo(t, session, "user2", "repo1", webhookData.URL, "workflow_run") + + repo1 := unittest.AssertExistsAndLoadBean(t, &repo.Repository{ID: 1}) + + gitRepo1, err := gitrepo.OpenRepository(t.Context(), repo1) + assert.NoError(t, err) + + // 2.2 trigger the webhooks + + // add workflow file to the repo + // init the workflow + wfTreePath := ".gitea/workflows/push.yml" + wfFileContent := `on: + push: + workflow_dispatch: + +jobs: + test: + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test2: + needs: [test] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test3: + needs: [test, test2] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test4: + needs: [test, test2, test3] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test5: + needs: [test, test2, test4] + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test6: + strategy: + matrix: + os: [ubuntu-20.04, ubuntu-22.04, ubuntu-24.04] + needs: [test, test2, test3] + runs-on: ${{ matrix.os }} + steps: + - run: exit 0 + + test7: + needs: test6 + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test8: + runs-on: ubuntu-latest + steps: + - run: exit 0 + + test9: + strategy: + matrix: + os: [ubuntu-20.04, ubuntu-22.04, ubuntu-24.04, ubuntu-25.04, windows-2022, windows-2025, macos-13, macos-14, macos-15] + runs-on: ${{ matrix.os }} + steps: + - run: exit 0 + + test10: + runs-on: ubuntu-latest + steps: + - run: exit 0` + opts := getWorkflowCreateFileOptions(user2, repo1.DefaultBranch, "create "+wfTreePath, wfFileContent) + createWorkflowFile(t, token, "user2", "repo1", wfTreePath, opts) + + commitID, err := gitRepo1.GetBranchCommitID(repo1.DefaultBranch) + assert.NoError(t, err) + + // 3. validate the webhook is triggered + assert.Equal(t, "workflow_run", webhookData.triggeredEvent) + assert.Len(t, webhookData.payloads, 1) + assert.Equal(t, "requested", webhookData.payloads[0].Action) + assert.Equal(t, "queued", webhookData.payloads[0].WorkflowRun.Status) + assert.Equal(t, repo1.DefaultBranch, webhookData.payloads[0].WorkflowRun.HeadBranch) + assert.Equal(t, commitID, webhookData.payloads[0].WorkflowRun.HeadSha) + assert.Equal(t, "repo1", webhookData.payloads[0].Repo.Name) + assert.Equal(t, "user2/repo1", webhookData.payloads[0].Repo.FullName) + + tasks := make([]*runnerv1.Task, len(runners)) + for i := range runners { + tasks[i] = runners[i].fetchTask(t) + if !partiallyAbandoned { + runners[i].execTask(t, tasks[i], &mockTaskOutcome{ + result: runnerv1.Result_RESULT_SUCCESS, + }) + } + } + + defer test.MockVariableValue(&setting.Actions.AbandonedJobTimeout, (time.Duration)(0))() + + err = actions.CancelAbandonedJobs(t.Context()) + assert.NoError(t, err) + + if partiallyAbandoned { + assert.Len(t, webhookData.payloads, 1) + } else { + assert.Len(t, webhookData.payloads, 2) + } +} + func testWebhookWorkflowRun(t *testing.T, webhookData *workflowRunWebhook) { // 1. create a new webhook with special webhook for repo1 user2 := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2})