From 0f678cbdb0001808454d165901e0d650b79718f1 Mon Sep 17 00:00:00 2001 From: luffy-orf Date: Sun, 25 May 2025 12:33:24 +0530 Subject: [PATCH 1/2] feat(actions): send email notification on workflow run completion --- models/actions/run_job.go | 9 +++++++++ services/mailer/notify.go | 12 ++++++++++++ services/notify/notifier.go | 1 + services/notify/notify.go | 6 ++++++ services/notify/null.go | 3 +++ 5 files changed, 31 insertions(+) diff --git a/models/actions/run_job.go b/models/actions/run_job.go index c0df19b020c2a..b02398a8da498 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -13,6 +13,7 @@ import ( repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" + "code.gitea.io/gitea/services/notify" "xorm.io/builder" ) @@ -156,6 +157,14 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col } if run.Stopped.IsZero() && run.Status.IsDone() { run.Stopped = timeutil.TimeStampNow() + // Send workflow run completion notification + import_notify_service := false + // Add import if not present + // import notify_service "code.gitea.io/gitea/services/notify" + // Load TriggerUser if not loaded + if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil { + notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) + } } if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil { return 0, fmt.Errorf("update run %d: %w", run.ID, err) diff --git a/services/mailer/notify.go b/services/mailer/notify.go index 77c366fe3195d..a1974263bbe46 100644 --- a/services/mailer/notify.go +++ b/services/mailer/notify.go @@ -14,6 +14,7 @@ import ( "code.gitea.io/gitea/modules/log" issue_service "code.gitea.io/gitea/services/issue" notify_service "code.gitea.io/gitea/services/notify" + actions_model "code.gitea.io/gitea/models/actions" ) type mailNotifier struct { @@ -205,3 +206,14 @@ func (m *mailNotifier) RepoPendingTransfer(ctx context.Context, doer, newOwner * log.Error("SendRepoTransferNotifyMail: %v", err) } } + +func (m *mailNotifier) WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun) { + if run == nil || run.TriggerUser == nil || run.TriggerUser.EmailNotificationsPreference == user_model.EmailNotificationsDisabled { + return + } + subject := fmt.Sprintf("[Gitea] Workflow '%s' %s in %s", run.WorkflowID, run.Status.String(), repo.FullName()) + body := fmt.Sprintf("Workflow '%s' in repository '%s' has %s.\n\nRun details: %s", run.WorkflowID, repo.FullName(), run.Status.String(), run.HTMLURL()) + if err := SendUserMail(ctx, run.TriggerUser, subject, body); err != nil { + log.Error("Failed to send workflow run notification: %v", err) + } +} diff --git a/services/notify/notifier.go b/services/notify/notifier.go index 40428454be0af..d9cddad14e142 100644 --- a/services/notify/notifier.go +++ b/services/notify/notifier.go @@ -80,4 +80,5 @@ type Notifier interface { CreateCommitStatus(ctx context.Context, repo *repo_model.Repository, commit *repository.PushCommit, sender *user_model.User, status *git_model.CommitStatus) WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask) + WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun) } diff --git a/services/notify/notify.go b/services/notify/notify.go index 9f8be4b577373..b35bfa4a75f98 100644 --- a/services/notify/notify.go +++ b/services/notify/notify.go @@ -381,3 +381,9 @@ func WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, s notifier.WorkflowJobStatusUpdate(ctx, repo, sender, job, task) } } + +func WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun) { + for _, notifier := range notifiers { + notifier.WorkflowRunStatusUpdate(ctx, repo, sender, run) + } +} diff --git a/services/notify/null.go b/services/notify/null.go index 9c794a2342cf7..73fed8e7e74e8 100644 --- a/services/notify/null.go +++ b/services/notify/null.go @@ -216,3 +216,6 @@ func (*NullNotifier) CreateCommitStatus(ctx context.Context, repo *repo_model.Re func (*NullNotifier) WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask) { } + +func (*NullNotifier) WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun) { +} From a72d20e4b18347e908cb92bcf995ddb2c9d4f3a0 Mon Sep 17 00:00:00 2001 From: luffy-orf Date: Sun, 25 May 2025 12:49:40 +0530 Subject: [PATCH 2/2] fix --- models/actions/run.go | 8 +++++++- models/actions/run_job.go | 32 ++++++++++++++------------------ models/actions/task.go | 18 +++++++++++++++--- routers/web/repo/actions/view.go | 27 +++++++++++++++++++++------ services/actions/clear_tasks.go | 15 ++++++++------- services/actions/job_emitter.go | 6 +++++- 6 files changed, 70 insertions(+), 36 deletions(-) diff --git a/models/actions/run.go b/models/actions/run.go index 498a73dc201ab..481916fd67ea2 100644 --- a/models/actions/run.go +++ b/models/actions/run.go @@ -241,7 +241,7 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin job.Stopped = timeutil.TimeStampNow() // Update the job's status and stopped time in the database. - n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") + n, run, runJustFinished, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") if err != nil { return cancelledJobs, err } @@ -251,6 +251,12 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin return cancelledJobs, errors.New("job has changed, try again") } + if runJustFinished && run != nil { + if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil { + notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) + } + } + cancelledJobs = append(cancelledJobs, job) // Continue with the next job. continue diff --git a/models/actions/run_job.go b/models/actions/run_job.go index b02398a8da498..2219db5f8b65d 100644 --- a/models/actions/run_job.go +++ b/models/actions/run_job.go @@ -13,7 +13,6 @@ import ( repo_model "code.gitea.io/gitea/models/repo" "code.gitea.io/gitea/modules/timeutil" "code.gitea.io/gitea/modules/util" - "code.gitea.io/gitea/services/notify" "xorm.io/builder" ) @@ -105,7 +104,8 @@ func GetRunJobsByRunID(ctx context.Context, runID int64) (ActionJobList, error) return jobs, nil } -func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, error) { +// UpdateRunJob updates a job and returns (affected, updatedRun, runJustFinished, error) +func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, *ActionRun, bool, error) { e := db.GetEngine(ctx) sess := e.ID(job.ID) @@ -119,37 +119,39 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col affected, err := sess.Update(job) if err != nil { - return 0, err + return 0, nil, false, err } if affected == 0 || (!slices.Contains(cols, "status") && job.Status == 0) { - return affected, nil + return affected, nil, false, nil } if affected != 0 && slices.Contains(cols, "status") && job.Status.IsWaiting() { // if the status of job changes to waiting again, increase tasks version. if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil { - return 0, err + return 0, nil, false, err } } if job.RunID == 0 { var err error if job, err = GetRunJobByID(ctx, job.ID); err != nil { - return 0, err + return 0, nil, false, err } } + var runJustFinished bool + var updatedRun *ActionRun { // Other goroutines may aggregate the status of the run and update it too. // So we need load the run and its jobs before updating the run. run, err := GetRunByRepoAndID(ctx, job.RepoID, job.RunID) if err != nil { - return 0, err + return 0, nil, false, err } jobs, err := GetRunJobsByRunID(ctx, job.RunID) if err != nil { - return 0, err + return 0, nil, false, err } run.Status = AggregateJobStatus(jobs) if run.Started.IsZero() && run.Status.IsRunning() { @@ -157,21 +159,15 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col } if run.Stopped.IsZero() && run.Status.IsDone() { run.Stopped = timeutil.TimeStampNow() - // Send workflow run completion notification - import_notify_service := false - // Add import if not present - // import notify_service "code.gitea.io/gitea/services/notify" - // Load TriggerUser if not loaded - if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil { - notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) - } + runJustFinished = true } if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil { - return 0, fmt.Errorf("update run %d: %w", run.ID, err) + return 0, nil, false, fmt.Errorf("update run %d: %w", run.ID, err) } + updatedRun = run } - return affected, nil + return affected, updatedRun, runJustFinished, nil } func AggregateJobStatus(jobs []*ActionRunJob) Status { diff --git a/models/actions/task.go b/models/actions/task.go index 63259582f6374..97f752324bc73 100644 --- a/models/actions/task.go +++ b/models/actions/task.go @@ -315,10 +315,14 @@ func CreateTaskForRunner(ctx context.Context, runner *ActionRunner) (*ActionTask } job.TaskID = task.ID - if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil { + if n, run, runJustFinished, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil { return nil, false, err } else if n != 1 { return nil, false, nil + } else if runJustFinished && run != nil { + if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil { + notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) + } } task.Job = job @@ -382,12 +386,16 @@ func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.Task if err := UpdateTask(ctx, task, "status", "stopped"); err != nil { return nil, err } - if _, err := UpdateRunJob(ctx, &ActionRunJob{ + if _, run, runJustFinished, err := UpdateRunJob(ctx, &ActionRunJob{ ID: task.JobID, Status: task.Status, Stopped: task.Stopped, }, nil); err != nil { return nil, err + } else if runJustFinished && run != nil { + if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil { + notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) + } } } else { // Force update ActionTask.Updated to avoid the task being judged as a zombie task @@ -446,12 +454,16 @@ func StopTask(ctx context.Context, taskID int64, status Status) error { now := timeutil.TimeStampNow() task.Status = status task.Stopped = now - if _, err := UpdateRunJob(ctx, &ActionRunJob{ + if _, run, runJustFinished, err := UpdateRunJob(ctx, &ActionRunJob{ ID: task.JobID, Status: task.Status, Stopped: task.Stopped, }, nil); err != nil { return err + } else if runJustFinished && run != nil { + if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil { + notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) + } } if err := UpdateTask(ctx, task, "status", "stopped"); err != nil { diff --git a/routers/web/repo/actions/view.go b/routers/web/repo/actions/view.go index 13b19862ff7c2..884441c1e1a2d 100644 --- a/routers/web/repo/actions/view.go +++ b/routers/web/repo/actions/view.go @@ -448,8 +448,16 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou job.Stopped = 0 if err := db.WithTx(ctx, func(ctx context.Context) error { - _, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, "task_id", "status", "started", "stopped") - return err + _, run, runJustFinished, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, "task_id", "status", "started", "stopped") + if err != nil { + return err + } + if runJustFinished && run != nil { + if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil { + notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) + } + } + return nil }); err != nil { return err } @@ -499,12 +507,14 @@ func Cancel(ctx *context_module.Context) { if job.TaskID == 0 { job.Status = actions_model.StatusCancelled job.Stopped = timeutil.TimeStampNow() - n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") + n, run, runJustFinished, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped") if err != nil { return err } - if n == 0 { - return errors.New("job has changed, try again") + if runJustFinished && run != nil { + if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil { + notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) + } } if n > 0 { updatedjobs = append(updatedjobs, job) @@ -552,10 +562,15 @@ func Approve(ctx *context_module.Context) { for _, job := range jobs { if len(job.Needs) == 0 && job.Status.IsBlocked() { job.Status = actions_model.StatusWaiting - n, err := actions_model.UpdateRunJob(ctx, job, nil, "status") + n, run, runJustFinished, err := actions_model.UpdateRunJob(ctx, job, nil, "status") if err != nil { return err } + if runJustFinished && run != nil { + if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil { + notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) + } + } if n > 0 { updatedjobs = append(updatedjobs, job) } diff --git a/services/actions/clear_tasks.go b/services/actions/clear_tasks.go index 2aeb0e8c96fc6..d5c76cf89e54b 100644 --- a/services/actions/clear_tasks.go +++ b/services/actions/clear_tasks.go @@ -112,17 +112,18 @@ func CancelAbandonedJobs(ctx context.Context) error { for _, job := range jobs { job.Status = actions_model.StatusCancelled job.Stopped = now - updated := false - if err := db.WithTx(ctx, func(ctx context.Context) error { - n, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "stopped") - updated = err == nil && n > 0 - return err - }); err != nil { + n, run, runJustFinished, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "stopped") + if err != nil { log.Warn("cancel abandoned job %v: %v", job.ID, err) // go on } + if runJustFinished && run != nil { + if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil { + notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) + } + } CreateCommitStatus(ctx, job) - if updated { + if n > 0 { _ = job.LoadAttributes(ctx) notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil) } diff --git a/services/actions/job_emitter.go b/services/actions/job_emitter.go index c11bb5875f45c..e06b454d75625 100644 --- a/services/actions/job_emitter.go +++ b/services/actions/job_emitter.go @@ -61,10 +61,14 @@ func checkJobsOfRun(ctx context.Context, runID int64) error { for _, job := range jobs { if status, ok := updates[job.ID]; ok { job.Status = status - if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil { + if n, run, runJustFinished, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil { return err } else if n != 1 { return fmt.Errorf("no affected for updating blocked job %v", job.ID) + } else if runJustFinished && run != nil { + if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil { + notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run) + } } updatedjobs = append(updatedjobs, job) }