Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion models/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@
job.Stopped = timeutil.TimeStampNow()

// Update the job's status and stopped time in the database.
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
n, run, runJustFinished, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return cancelledJobs, err
}
Expand All @@ -251,6 +251,12 @@
return cancelledJobs, errors.New("job has changed, try again")
}

if runJustFinished && run != nil {
if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil {
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / backend

undefined: notify_service

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / test-sqlite

undefined: notify_service

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / test-mssql

undefined: notify_service

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / test-pgsql

undefined: notify_service

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / test-mysql

undefined: notify_service

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / test-unit

undefined: notify_service

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / lint-go-windows

undefined: notify_service

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / lint-go-windows

undefined: notify_service

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / lint-backend

undefined: notify_service

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / lint-backend

undefined: notify_service

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / lint-go-gogit

undefined: notify_service

Check failure on line 256 in models/actions/run.go

View workflow job for this annotation

GitHub Actions / lint-go-gogit

undefined: notify_service
}
}

cancelledJobs = append(cancelledJobs, job)
// Continue with the next job.
continue
Expand Down
23 changes: 14 additions & 9 deletions models/actions/run_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func GetRunJobsByRunID(ctx context.Context, runID int64) (ActionJobList, error)
return jobs, nil
}

func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, error) {
// UpdateRunJob updates a job and returns (affected, updatedRun, runJustFinished, error)
func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, cols ...string) (int64, *ActionRun, bool, error) {
e := db.GetEngine(ctx)

sess := e.ID(job.ID)
Expand All @@ -118,51 +119,55 @@ func UpdateRunJob(ctx context.Context, job *ActionRunJob, cond builder.Cond, col

affected, err := sess.Update(job)
if err != nil {
return 0, err
return 0, nil, false, err
}

if affected == 0 || (!slices.Contains(cols, "status") && job.Status == 0) {
return affected, nil
return affected, nil, false, nil
}

if affected != 0 && slices.Contains(cols, "status") && job.Status.IsWaiting() {
// if the status of job changes to waiting again, increase tasks version.
if err := IncreaseTaskVersion(ctx, job.OwnerID, job.RepoID); err != nil {
return 0, err
return 0, nil, false, err
}
}

if job.RunID == 0 {
var err error
if job, err = GetRunJobByID(ctx, job.ID); err != nil {
return 0, err
return 0, nil, false, err
}
}

var runJustFinished bool
var updatedRun *ActionRun
{
// Other goroutines may aggregate the status of the run and update it too.
// So we need load the run and its jobs before updating the run.
run, err := GetRunByRepoAndID(ctx, job.RepoID, job.RunID)
if err != nil {
return 0, err
return 0, nil, false, err
}
jobs, err := GetRunJobsByRunID(ctx, job.RunID)
if err != nil {
return 0, err
return 0, nil, false, err
}
run.Status = AggregateJobStatus(jobs)
if run.Started.IsZero() && run.Status.IsRunning() {
run.Started = timeutil.TimeStampNow()
}
if run.Stopped.IsZero() && run.Status.IsDone() {
run.Stopped = timeutil.TimeStampNow()
runJustFinished = true
}
if err := UpdateRun(ctx, run, "status", "started", "stopped"); err != nil {
return 0, fmt.Errorf("update run %d: %w", run.ID, err)
return 0, nil, false, fmt.Errorf("update run %d: %w", run.ID, err)
}
updatedRun = run
}

return affected, nil
return affected, updatedRun, runJustFinished, nil
}

func AggregateJobStatus(jobs []*ActionRunJob) Status {
Expand Down
18 changes: 15 additions & 3 deletions models/actions/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,10 +315,14 @@
}

job.TaskID = task.ID
if n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil {
if n, run, runJustFinished, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}); err != nil {
return nil, false, err
} else if n != 1 {
return nil, false, nil
} else if runJustFinished && run != nil {
if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil {
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / backend

undefined: notify_service

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-sqlite

undefined: notify_service

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-mssql

undefined: notify_service

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-pgsql

undefined: notify_service

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-mysql

undefined: notify_service

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-unit

undefined: notify_service

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-windows

undefined: notify_service

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-windows

undefined: notify_service

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-backend

undefined: notify_service

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-backend

undefined: notify_service

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-gogit

undefined: notify_service

Check failure on line 324 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-gogit

undefined: notify_service
}
}

task.Job = job
Expand Down Expand Up @@ -382,12 +386,16 @@
if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
return nil, err
}
if _, err := UpdateRunJob(ctx, &ActionRunJob{
if _, run, runJustFinished, err := UpdateRunJob(ctx, &ActionRunJob{
ID: task.JobID,
Status: task.Status,
Stopped: task.Stopped,
}, nil); err != nil {
return nil, err
} else if runJustFinished && run != nil {
if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil {
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / backend

undefined: notify_service

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-sqlite

undefined: notify_service

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-mssql

undefined: notify_service

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-pgsql

undefined: notify_service

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-mysql

undefined: notify_service

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-unit

undefined: notify_service

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-windows

undefined: notify_service

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-windows

undefined: notify_service

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-backend

undefined: notify_service

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-backend

undefined: notify_service

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-gogit

undefined: notify_service

Check failure on line 397 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-gogit

undefined: notify_service
}
}
} else {
// Force update ActionTask.Updated to avoid the task being judged as a zombie task
Expand Down Expand Up @@ -446,12 +454,16 @@
now := timeutil.TimeStampNow()
task.Status = status
task.Stopped = now
if _, err := UpdateRunJob(ctx, &ActionRunJob{
if _, run, runJustFinished, err := UpdateRunJob(ctx, &ActionRunJob{
ID: task.JobID,
Status: task.Status,
Stopped: task.Stopped,
}, nil); err != nil {
return err
} else if runJustFinished && run != nil {
if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil {
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / backend

undefined: notify_service

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-sqlite

undefined: notify_service

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-mssql

undefined: notify_service

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-pgsql

undefined: notify_service

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-mysql

undefined: notify_service

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / test-unit

undefined: notify_service

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-windows

undefined: notify_service (typecheck)

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-windows

undefined: notify_service) (typecheck)

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-backend

undefined: notify_service (typecheck)

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-backend

undefined: notify_service) (typecheck)

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-gogit

undefined: notify_service (typecheck)

Check failure on line 465 in models/actions/task.go

View workflow job for this annotation

GitHub Actions / lint-go-gogit

undefined: notify_service) (typecheck)
}
}

if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
Expand Down
27 changes: 21 additions & 6 deletions routers/web/repo/actions/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,16 @@ func rerunJob(ctx *context_module.Context, job *actions_model.ActionRunJob, shou
job.Stopped = 0

if err := db.WithTx(ctx, func(ctx context.Context) error {
_, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, "task_id", "status", "started", "stopped")
return err
_, run, runJustFinished, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": status}, "task_id", "status", "started", "stopped")
if err != nil {
return err
}
if runJustFinished && run != nil {
if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil {
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
}
}
return nil
}); err != nil {
return err
}
Expand Down Expand Up @@ -499,12 +507,14 @@ func Cancel(ctx *context_module.Context) {
if job.TaskID == 0 {
job.Status = actions_model.StatusCancelled
job.Stopped = timeutil.TimeStampNow()
n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
n, run, runJustFinished, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
if err != nil {
return err
}
if n == 0 {
return errors.New("job has changed, try again")
if runJustFinished && run != nil {
if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil {
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
}
}
if n > 0 {
updatedjobs = append(updatedjobs, job)
Expand Down Expand Up @@ -552,10 +562,15 @@ func Approve(ctx *context_module.Context) {
for _, job := range jobs {
if len(job.Needs) == 0 && job.Status.IsBlocked() {
job.Status = actions_model.StatusWaiting
n, err := actions_model.UpdateRunJob(ctx, job, nil, "status")
n, run, runJustFinished, err := actions_model.UpdateRunJob(ctx, job, nil, "status")
if err != nil {
return err
}
if runJustFinished && run != nil {
if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil {
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
}
}
if n > 0 {
updatedjobs = append(updatedjobs, job)
}
Expand Down
15 changes: 8 additions & 7 deletions services/actions/clear_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,17 +112,18 @@ func CancelAbandonedJobs(ctx context.Context) error {
for _, job := range jobs {
job.Status = actions_model.StatusCancelled
job.Stopped = now
updated := false
if err := db.WithTx(ctx, func(ctx context.Context) error {
n, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "stopped")
updated = err == nil && n > 0
return err
}); err != nil {
n, run, runJustFinished, err := actions_model.UpdateRunJob(ctx, job, nil, "status", "stopped")
if err != nil {
log.Warn("cancel abandoned job %v: %v", job.ID, err)
// go on
}
if runJustFinished && run != nil {
if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil {
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
}
}
CreateCommitStatus(ctx, job)
if updated {
if n > 0 {
_ = job.LoadAttributes(ctx)
notify_service.WorkflowJobStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job, nil)
}
Expand Down
6 changes: 5 additions & 1 deletion services/actions/job_emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,14 @@ func checkJobsOfRun(ctx context.Context, runID int64) error {
for _, job := range jobs {
if status, ok := updates[job.ID]; ok {
job.Status = status
if n, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
if n, run, runJustFinished, err := actions_model.UpdateRunJob(ctx, job, builder.Eq{"status": actions_model.StatusBlocked}, "status"); err != nil {
return err
} else if n != 1 {
return fmt.Errorf("no affected for updating blocked job %v", job.ID)
} else if runJustFinished && run != nil {
if err := run.LoadAttributes(ctx); err == nil && run.TriggerUser != nil {
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
}
}
updatedjobs = append(updatedjobs, job)
}
Expand Down
12 changes: 12 additions & 0 deletions services/mailer/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"code.gitea.io/gitea/modules/log"
issue_service "code.gitea.io/gitea/services/issue"
notify_service "code.gitea.io/gitea/services/notify"
actions_model "code.gitea.io/gitea/models/actions"
)

type mailNotifier struct {
Expand Down Expand Up @@ -205,3 +206,14 @@ func (m *mailNotifier) RepoPendingTransfer(ctx context.Context, doer, newOwner *
log.Error("SendRepoTransferNotifyMail: %v", err)
}
}

func (m *mailNotifier) WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun) {
if run == nil || run.TriggerUser == nil || run.TriggerUser.EmailNotificationsPreference == user_model.EmailNotificationsDisabled {
return
}
subject := fmt.Sprintf("[Gitea] Workflow '%s' %s in %s", run.WorkflowID, run.Status.String(), repo.FullName())
body := fmt.Sprintf("Workflow '%s' in repository '%s' has %s.\n\nRun details: %s", run.WorkflowID, repo.FullName(), run.Status.String(), run.HTMLURL())
if err := SendUserMail(ctx, run.TriggerUser, subject, body); err != nil {
log.Error("Failed to send workflow run notification: %v", err)
}
}
1 change: 1 addition & 0 deletions services/notify/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,5 @@ type Notifier interface {
CreateCommitStatus(ctx context.Context, repo *repo_model.Repository, commit *repository.PushCommit, sender *user_model.User, status *git_model.CommitStatus)

WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask)
WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun)
}
6 changes: 6 additions & 0 deletions services/notify/notify.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,3 +381,9 @@ func WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, s
notifier.WorkflowJobStatusUpdate(ctx, repo, sender, job, task)
}
}

func WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun) {
for _, notifier := range notifiers {
notifier.WorkflowRunStatusUpdate(ctx, repo, sender, run)
}
}
3 changes: 3 additions & 0 deletions services/notify/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,6 @@ func (*NullNotifier) CreateCommitStatus(ctx context.Context, repo *repo_model.Re

func (*NullNotifier) WorkflowJobStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, job *actions_model.ActionRunJob, task *actions_model.ActionTask) {
}

func (*NullNotifier) WorkflowRunStatusUpdate(ctx context.Context, repo *repo_model.Repository, sender *user_model.User, run *actions_model.ActionRun) {
}
Loading