@@ -18,130 +18,125 @@ import (
1818// InsertRun inserts a run
1919// The title will be cut off at 255 characters if it's longer than 255 characters.
2020func InsertRun (ctx context.Context , run * actions_model.ActionRun , jobs []* jobparser.SingleWorkflow ) error {
21- ctx , committer , err := db .TxContext (ctx )
22- if err != nil {
23- return err
24- }
25- defer committer .Close ()
26-
27- index , err := db .GetNextResourceIndex (ctx , "action_run_index" , run .RepoID )
28- if err != nil {
29- return err
30- }
31- run .Index = index
32- run .Title = util .EllipsisDisplayString (run .Title , 255 )
33-
34- // check run (workflow-level) concurrency
35- blockRunByConcurrency , err := actions_model .ShouldBlockRunByConcurrency (ctx , run )
36- if err != nil {
37- return err
38- }
39- if blockRunByConcurrency {
40- run .Status = actions_model .StatusBlocked
41- }
42- if err := CancelJobsByRunConcurrency (ctx , run ); err != nil {
43- return fmt .Errorf ("cancel jobs: %w" , err )
44- }
45-
46- if err := db .Insert (ctx , run ); err != nil {
47- return err
48- }
49-
50- if run .Repo == nil {
51- repo , err := repo_model .GetRepositoryByID (ctx , run .RepoID )
21+ return db .WithTx (ctx , func (ctx context.Context ) error {
22+ index , err := db .GetNextResourceIndex (ctx , "action_run_index" , run .RepoID )
5223 if err != nil {
5324 return err
5425 }
55- run .Repo = repo
56- }
57-
58- if err := actions_model .UpdateRepoRunsNumbers (ctx , run .Repo ); err != nil {
59- return err
60- }
61-
62- // query vars for evaluating job concurrency groups
63- vars , err := actions_model .GetVariablesOfRun (ctx , run )
64- if err != nil {
65- return fmt .Errorf ("get run %d variables: %w" , run .ID , err )
66- }
67-
68- runJobs := make ([]* actions_model.ActionRunJob , 0 , len (jobs ))
69- var hasWaiting bool
70- for _ , v := range jobs {
71- id , job := v .Job ()
72- needs := job .Needs ()
73- if err := v .SetJob (id , job .EraseNeeds ()); err != nil {
26+ run .Index = index
27+ run .Title = util .EllipsisDisplayString (run .Title , 255 )
28+
29+ // check run (workflow-level) concurrency
30+ blockRunByConcurrency , err := actions_model .ShouldBlockRunByConcurrency (ctx , run )
31+ if err != nil {
7432 return err
7533 }
76- payload , _ := v .Marshal ()
77- status := actions_model .StatusWaiting
78- if len (needs ) > 0 || run .NeedApproval || run .Status == actions_model .StatusBlocked {
79- status = actions_model .StatusBlocked
80- } else {
81- hasWaiting = true
34+ if blockRunByConcurrency {
35+ run .Status = actions_model .StatusBlocked
8236 }
83- job .Name = util .EllipsisDisplayString (job .Name , 255 )
84- runJob := & actions_model.ActionRunJob {
85- RunID : run .ID ,
86- RepoID : run .RepoID ,
87- OwnerID : run .OwnerID ,
88- CommitSHA : run .CommitSHA ,
89- IsForkPullRequest : run .IsForkPullRequest ,
90- Name : job .Name ,
91- WorkflowPayload : payload ,
92- JobID : id ,
93- Needs : needs ,
94- RunsOn : job .RunsOn (),
95- Status : status ,
37+ if err := CancelJobsByRunConcurrency (ctx , run ); err != nil {
38+ return fmt .Errorf ("cancel jobs: %w" , err )
9639 }
9740
98- // check job concurrency
99- if job .RawConcurrency != nil && job .RawConcurrency .Group != "" {
100- runJob .RawConcurrencyGroup = job .RawConcurrency .Group
101- runJob .RawConcurrencyCancel = job .RawConcurrency .CancelInProgress
102- // do not evaluate job concurrency when it requires `needs`
103- if len (needs ) == 0 {
104- var err error
105- runJob .ConcurrencyGroup , runJob .ConcurrencyCancel , err = EvaluateJobConcurrency (ctx , run , runJob , vars , nil )
106- if err != nil {
107- return fmt .Errorf ("evaluate job concurrency: %w" , err )
108- }
109- runJob .IsConcurrencyEvaluated = true
110- }
111- // do not need to check job concurrency if the job is blocked because it will be checked by job emitter
112- if runJob .Status != actions_model .StatusBlocked {
113- // check if the job should be blocked by job concurrency
114- blockByConcurrency , err := actions_model .ShouldBlockJobByConcurrency (ctx , runJob )
115- if err != nil {
116- return err
117- }
118- if blockByConcurrency {
119- runJob .Status = actions_model .StatusBlocked
120- }
121- if err := CancelJobsByJobConcurrency (ctx , runJob ); err != nil {
122- return fmt .Errorf ("cancel jobs: %w" , err )
123- }
41+ if err := db .Insert (ctx , run ); err != nil {
42+ return err
43+ }
44+
45+ if run .Repo == nil {
46+ repo , err := repo_model .GetRepositoryByID (ctx , run .RepoID )
47+ if err != nil {
48+ return err
12449 }
50+ run .Repo = repo
12551 }
12652
127- if err := db . Insert (ctx , runJob ); err != nil {
53+ if err := actions_model . UpdateRepoRunsNumbers (ctx , run . Repo ); err != nil {
12854 return err
12955 }
13056
131- runJobs = append (runJobs , runJob )
132- }
57+ // query vars for evaluating job concurrency groups
58+ vars , err := actions_model .GetVariablesOfRun (ctx , run )
59+ if err != nil {
60+ return fmt .Errorf ("get run %d variables: %w" , run .ID , err )
61+ }
13362
134- run .Status = actions_model .AggregateJobStatus (runJobs )
135- if err := actions_model .UpdateRun (ctx , run , "status" ); err != nil {
136- return err
137- }
63+ runJobs := make ([]* actions_model.ActionRunJob , 0 , len (jobs ))
64+ var hasWaiting bool
65+ for _ , v := range jobs {
66+ id , job := v .Job ()
67+ needs := job .Needs ()
68+ if err := v .SetJob (id , job .EraseNeeds ()); err != nil {
69+ return err
70+ }
71+ payload , _ := v .Marshal ()
72+ status := actions_model .StatusWaiting
73+ if len (needs ) > 0 || run .NeedApproval || run .Status == actions_model .StatusBlocked {
74+ status = actions_model .StatusBlocked
75+ } else {
76+ hasWaiting = true
77+ }
78+ job .Name = util .EllipsisDisplayString (job .Name , 255 )
79+ runJob := & actions_model.ActionRunJob {
80+ RunID : run .ID ,
81+ RepoID : run .RepoID ,
82+ OwnerID : run .OwnerID ,
83+ CommitSHA : run .CommitSHA ,
84+ IsForkPullRequest : run .IsForkPullRequest ,
85+ Name : job .Name ,
86+ WorkflowPayload : payload ,
87+ JobID : id ,
88+ Needs : needs ,
89+ RunsOn : job .RunsOn (),
90+ Status : status ,
91+ }
92+ // check job concurrency
93+ if job .RawConcurrency != nil && job .RawConcurrency .Group != "" {
94+ runJob .RawConcurrencyGroup = job .RawConcurrency .Group
95+ runJob .RawConcurrencyCancel = job .RawConcurrency .CancelInProgress
96+ // do not evaluate job concurrency when it requires `needs`
97+ if len (needs ) == 0 {
98+ var err error
99+ runJob .ConcurrencyGroup , runJob .ConcurrencyCancel , err = EvaluateJobConcurrency (ctx , run , runJob , vars , nil )
100+ if err != nil {
101+ return fmt .Errorf ("evaluate job concurrency: %w" , err )
102+ }
103+ runJob .IsConcurrencyEvaluated = true
104+ }
105+ // do not need to check job concurrency if the job is blocked because it will be checked by job emitter
106+ if runJob .Status != actions_model .StatusBlocked {
107+ // check if the job should be blocked by job concurrency
108+ blockByConcurrency , err := actions_model .ShouldBlockJobByConcurrency (ctx , runJob )
109+ if err != nil {
110+ return err
111+ }
112+ if blockByConcurrency {
113+ runJob .Status = actions_model .StatusBlocked
114+ }
115+ if err := CancelJobsByJobConcurrency (ctx , runJob ); err != nil {
116+ return fmt .Errorf ("cancel jobs: %w" , err )
117+ }
118+ }
119+ }
120+
121+ if err := db .Insert (ctx , runJob ); err != nil {
122+ return err
123+ }
124+
125+ runJobs = append (runJobs , runJob )
126+ }
138127
139- // if there is a job in the waiting status, increase tasks version.
140- if hasWaiting {
141- if err := actions_model .IncreaseTaskVersion (ctx , run .OwnerID , run .RepoID ); err != nil {
128+ run .Status = actions_model .AggregateJobStatus (runJobs )
129+ if err := actions_model .UpdateRun (ctx , run , "status" ); err != nil {
142130 return err
143131 }
144- }
145132
146- return committer .Commit ()
133+ // if there is a job in the waiting status, increase tasks version.
134+ if hasWaiting {
135+ if err := actions_model .IncreaseTaskVersion (ctx , run .OwnerID , run .RepoID ); err != nil {
136+ return err
137+ }
138+ }
139+
140+ return nil
141+ })
147142}
0 commit comments