Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 2 additions & 77 deletions services/actions/notifier_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ import (
api "code.gitea.io/gitea/modules/structs"
webhook_module "code.gitea.io/gitea/modules/webhook"
"code.gitea.io/gitea/services/convert"
notify_service "code.gitea.io/gitea/services/notify"

"github.com/nektos/act/pkg/jobparser"
"github.com/nektos/act/pkg/model"
)

Expand Down Expand Up @@ -346,65 +344,10 @@ func handleWorkflows(

run.NeedApproval = need

if err := run.LoadAttributes(ctx); err != nil {
log.Error("LoadAttributes: %v", err)
if err := PrepareRunAndInsert(ctx, dwf.Content, run, nil); err != nil {
log.Error("PrepareRun: %v", err)
continue
}

vars, err := actions_model.GetVariablesOfRun(ctx, run)
if err != nil {
log.Error("GetVariablesOfRun: %v", err)
continue
}

wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(dwf.Content)
if err != nil {
log.Error("ReadWorkflowRawConcurrency: %v", err)
continue
}
if wfRawConcurrency != nil {
err = EvaluateRunConcurrencyFillModel(ctx, run, wfRawConcurrency, vars)
if err != nil {
log.Error("EvaluateRunConcurrencyFillModel: %v", err)
continue
}
}

giteaCtx := GenerateGiteaContext(run, nil)

jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars), jobparser.WithGitContext(giteaCtx.ToGitHubContext()))
if err != nil {
log.Error("jobparser.Parse: %v", err)
continue
}

if len(jobs) > 0 && jobs[0].RunName != "" {
run.Title = jobs[0].RunName
}

if err := InsertRun(ctx, run, jobs); err != nil {
log.Error("InsertRun: %v", err)
continue
}

alljobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
if err != nil {
log.Error("FindRunJobs: %v", err)
continue
}
CreateCommitStatus(ctx, alljobs...)
if len(alljobs) > 0 {
job := alljobs[0]
err := job.LoadRun(ctx)
if err != nil {
log.Error("LoadRun: %v", err)
continue
}
notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run)
}
for _, job := range alljobs {
notify_service.WorkflowJobStatusUpdate(ctx, input.Repo, input.Doer, job, nil)
}
}
return nil
}
Expand Down Expand Up @@ -559,24 +502,6 @@ func handleSchedules(
Content: dwf.Content,
}

vars, err := actions_model.GetVariablesOfRun(ctx, run.ToActionRun())
if err != nil {
log.Error("GetVariablesOfRun: %v", err)
continue
}

giteaCtx := GenerateGiteaContext(run.ToActionRun(), nil)

jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars), jobparser.WithGitContext(giteaCtx.ToGitHubContext()))
if err != nil {
log.Error("jobparser.Parse: %v", err)
continue
}

if len(jobs) > 0 && jobs[0].RunName != "" {
run.Title = jobs[0].RunName
}

crons = append(crons, run)
}

Expand Down
72 changes: 65 additions & 7 deletions services/actions/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,79 @@ import (

actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/util"
notify_service "code.gitea.io/gitea/services/notify"

"github.com/nektos/act/pkg/jobparser"
"gopkg.in/yaml.v3"
)

// PrepareRunAndInsert prepares a run and inserts it into the database
// It parses the workflow content, evaluates concurrency if needed, and inserts the run and its jobs into the database.
// The title will be cut off at 255 characters if it's longer than 255 characters.
func PrepareRunAndInsert(ctx context.Context, content []byte, run *actions_model.ActionRun, inputsWithDefaults map[string]any) error {
if err := run.LoadAttributes(ctx); err != nil {
return fmt.Errorf("LoadAttributes: %w", err)
}

vars, err := actions_model.GetVariablesOfRun(ctx, run)
if err != nil {
return fmt.Errorf("GetVariablesOfRun: %w", err)
}

wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(content)
if err != nil {
return fmt.Errorf("ReadWorkflowRawConcurrency: %w", err)
}

if wfRawConcurrency != nil {
err = EvaluateRunConcurrencyFillModel(ctx, run, wfRawConcurrency, vars)
if err != nil {
return fmt.Errorf("EvaluateRunConcurrencyFillModel: %w", err)
}
}

giteaCtx := GenerateGiteaContext(run, nil)

jobs, err := jobparser.Parse(content, jobparser.WithVars(vars), jobparser.WithGitContext(giteaCtx.ToGitHubContext()), jobparser.WithInputs(inputsWithDefaults))
if err != nil {
return fmt.Errorf("parse workflow: %w", err)
}

if len(jobs) > 0 && jobs[0].RunName != "" {
run.Title = jobs[0].RunName
}

if err := InsertRun(ctx, run, jobs, vars); err != nil {
return fmt.Errorf("InsertRun: %w", err)
}

// FIXME PERF do we need this db round trip?
allJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
if err != nil {
log.Error("FindRunJobs: %v", err)
}

// FIXME PERF skip this for schedule, dispatch etc.
CreateCommitStatus(ctx, allJobs...)

err = run.LoadAttributes(ctx)
if err != nil {
log.Error("LoadAttributes: %v", err)
}
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
for _, job := range allJobs {
notify_service.WorkflowJobStatusUpdate(ctx, run.Repo, run.TriggerUser, job, nil)
}

// Return nil if no errors occurred
return nil
}

// InsertRun inserts a run
// The title will be cut off at 255 characters if it's longer than 255 characters.
func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow) error {
func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow, vars map[string]string) error {
return db.WithTx(ctx, func(ctx context.Context) error {
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
if err != nil {
Expand All @@ -44,12 +108,6 @@ func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobpar
return err
}

// query vars for evaluating job concurrency groups
vars, err := actions_model.GetVariablesOfRun(ctx, run)
if err != nil {
return fmt.Errorf("get run %d variables: %w", run.ID, err)
}

runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs))
var hasWaitingJobs bool
for _, v := range jobs {
Expand Down
41 changes: 3 additions & 38 deletions services/actions/schedule_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/timeutil"
webhook_module "code.gitea.io/gitea/modules/webhook"
notify_service "code.gitea.io/gitea/services/notify"

"github.com/nektos/act/pkg/jobparser"
)

// StartScheduleTasks start the task
Expand Down Expand Up @@ -119,44 +116,12 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
Status: actions_model.StatusWaiting,
}

vars, err := actions_model.GetVariablesOfRun(ctx, run)
if err != nil {
log.Error("GetVariablesOfRun: %v", err)
return err
}

// Parse the workflow specification from the cron schedule
workflows, err := jobparser.Parse(cron.Content, jobparser.WithVars(vars))
if err != nil {
return err
}
wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(cron.Content)
if err != nil {
return err
}
if wfRawConcurrency != nil {
err = EvaluateRunConcurrencyFillModel(ctx, run, wfRawConcurrency, vars)
if err != nil {
return fmt.Errorf("EvaluateRunConcurrencyFillModel: %w", err)
}
}

// FIXME cron.Content might be outdated if the workflow file has been changed.
// Load the latest sha from default branch
// Insert the action run and its associated jobs into the database
if err := InsertRun(ctx, run, workflows); err != nil {
if err := PrepareRunAndInsert(ctx, cron.Content, run, nil); err != nil {
return err
}
allJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
if err != nil {
log.Error("FindRunJobs: %v", err)
}
err = run.LoadAttributes(ctx)
if err != nil {
log.Error("LoadAttributes: %v", err)
}
notify_service.WorkflowRunStatusUpdate(ctx, run.Repo, run.TriggerUser, run)
for _, job := range allJobs {
notify_service.WorkflowJobStatusUpdate(ctx, run.Repo, run.TriggerUser, job, nil)
}

// Return nil if no errors occurred
return nil
Expand Down
62 changes: 2 additions & 60 deletions services/actions/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@ import (
"strings"

actions_model "code.gitea.io/gitea/models/actions"
"code.gitea.io/gitea/models/db"
"code.gitea.io/gitea/models/perm"
access_model "code.gitea.io/gitea/models/perm/access"
repo_model "code.gitea.io/gitea/models/repo"
"code.gitea.io/gitea/models/unit"
user_model "code.gitea.io/gitea/models/user"
"code.gitea.io/gitea/modules/actions"
"code.gitea.io/gitea/modules/git"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/reqctx"
api "code.gitea.io/gitea/modules/structs"
"code.gitea.io/gitea/modules/util"
"code.gitea.io/gitea/services/context"
"code.gitea.io/gitea/services/convert"
notify_service "code.gitea.io/gitea/services/notify"

"github.com/nektos/act/pkg/jobparser"
"github.com/nektos/act/pkg/model"
Expand Down Expand Up @@ -98,9 +95,7 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re
}

// find workflow from commit
var workflows []*jobparser.SingleWorkflow
var entry *git.TreeEntry
var wfRawConcurrency *model.RawConcurrency

run := &actions_model.ActionRun{
Title: strings.SplitN(runTargetCommit.CommitMessage, "\n", 2)[0],
Expand Down Expand Up @@ -153,29 +148,6 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re
}
}

giteaCtx := GenerateGiteaContext(run, nil)

workflows, err = jobparser.Parse(content, jobparser.WithGitContext(giteaCtx.ToGitHubContext()), jobparser.WithInputs(inputsWithDefaults))
if err != nil {
return err
}

if len(workflows) > 0 && workflows[0].RunName != "" {
run.Title = workflows[0].RunName
}

if len(workflows) == 0 {
return util.ErrorWrapLocale(
util.NewNotExistErrorf("workflow %q doesn't exist", workflowID),
"actions.workflow.not_found", workflowID,
)
}

wfRawConcurrency, err = jobparser.ReadWorkflowRawConcurrency(content)
if err != nil {
return err
}

// ctx.Req.PostForm -> WorkflowDispatchPayload.Inputs -> ActionRun.EventPayload -> runner: ghc.Event
// https://docs.github.com/en/actions/learn-github-actions/contexts#github-context
// https://docs.github.com/en/webhooks/webhook-events-and-payloads#workflow_dispatch
Expand All @@ -193,39 +165,9 @@ func DispatchActionWorkflow(ctx reqctx.RequestContext, doer *user_model.User, re
}
run.EventPayload = string(eventPayload)

// cancel running jobs of the same concurrency group
if wfRawConcurrency != nil {
vars, err := actions_model.GetVariablesOfRun(ctx, run)
if err != nil {
return fmt.Errorf("GetVariablesOfRun: %w", err)
}
err = EvaluateRunConcurrencyFillModel(ctx, run, wfRawConcurrency, vars)
if err != nil {
return fmt.Errorf("EvaluateRunConcurrencyFillModel: %w", err)
}
}

// Insert the action run and its associated jobs into the database
if err := InsertRun(ctx, run, workflows); err != nil {
return fmt.Errorf("InsertRun: %w", err)
}

allJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
if err != nil {
log.Error("FindRunJobs: %v", err)
}
CreateCommitStatus(ctx, allJobs...)
if len(allJobs) > 0 {
job := allJobs[0]
err := job.LoadRun(ctx)
if err != nil {
log.Error("LoadRun: %v", err)
} else {
notify_service.WorkflowRunStatusUpdate(ctx, job.Run.Repo, job.Run.TriggerUser, job.Run)
}
}
for _, job := range allJobs {
notify_service.WorkflowJobStatusUpdate(ctx, repo, doer, job, nil)
if err := PrepareRunAndInsert(ctx, content, run, inputsWithDefaults); err != nil {
return fmt.Errorf("PrepareRun: %w", err)
}
return nil
}