@@ -62,95 +62,11 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
6262			jobs  =  append (jobs , js ... )
6363			updatedJobs  =  append (updatedJobs , ujs ... )
6464		}
65- 		// check run (workflow-level) concurrency 
66- 		concurrentRunIDs  :=  make (container.Set [int64 ])
67- 		concurrentRunIDs .Add (run .ID )
68- 		if  run .ConcurrencyGroup  !=  ""  {
69- 			concurrentRuns , err  :=  db .Find [actions_model.ActionRun ](ctx , actions_model.FindRunOptions {
70- 				RepoID :           run .RepoID ,
71- 				ConcurrencyGroup : run .ConcurrencyGroup ,
72- 				Status :           []actions_model.Status {actions_model .StatusBlocked },
73- 			})
74- 			if  err  !=  nil  {
75- 				return  err 
76- 			}
77- 			for  _ , concurrentRun  :=  range  concurrentRuns  {
78- 				if  concurrentRunIDs .Contains (concurrentRun .ID ) {
79- 					continue 
80- 				}
81- 				concurrentRunIDs .Add (concurrentRun .ID )
82- 				if  concurrentRun .NeedApproval  {
83- 					continue 
84- 				}
85- 				if  js , ujs , err  :=  checkJobsOfRun (ctx , concurrentRun ); err  !=  nil  {
86- 					return  err 
87- 				} else  {
88- 					jobs  =  append (jobs , js ... )
89- 					updatedJobs  =  append (updatedJobs , ujs ... )
90- 				}
91- 				updatedRun , err  :=  actions_model .GetRunByID (ctx , concurrentRun .ID )
92- 				if  err  !=  nil  {
93- 					return  err 
94- 				}
95- 				if  updatedRun .Status  ==  actions_model .StatusWaiting  {
96- 					// only run one blocked action run in the same concurrency group 
97- 					break 
98- 				}
99- 			}
100- 		}
101- 
102- 		// check job concurrency 
103- 		runJobs , err  :=  db .Find [actions_model.ActionRunJob ](ctx , actions_model.FindRunJobOptions {RunID : run .ID })
104- 		if  err  !=  nil  {
65+ 		if  js , ujs , err  :=  checkRunConcurrency (ctx , run ); err  !=  nil  {
10566			return  err 
106- 		}
107- 		for  _ , job  :=  range  runJobs  {
108- 			if  job .Status .IsDone () &&  job .ConcurrencyGroup  !=  ""  {
109- 				waitingConcurrentJobs , err  :=  db .Find [actions_model.ActionRunJob ](ctx , actions_model.FindRunJobOptions {
110- 					RepoID :           job .RepoID ,
111- 					ConcurrencyGroup : job .ConcurrencyGroup ,
112- 					Statuses :         []actions_model.Status {actions_model .StatusWaiting },
113- 				})
114- 				if  err  !=  nil  {
115- 					return  err 
116- 				}
117- 				if  len (waitingConcurrentJobs ) ==  0  {
118- 					blockedConcurrentJobs , err  :=  db .Find [actions_model.ActionRunJob ](ctx , actions_model.FindRunJobOptions {
119- 						RepoID :           job .RepoID ,
120- 						ConcurrencyGroup : job .ConcurrencyGroup ,
121- 						Statuses :         []actions_model.Status {actions_model .StatusBlocked },
122- 					})
123- 					if  err  !=  nil  {
124- 						return  err 
125- 					}
126- 					for  _ , concurrentJob  :=  range  blockedConcurrentJobs  {
127- 						if  concurrentRunIDs .Contains (concurrentJob .RunID ) {
128- 							continue 
129- 						}
130- 						concurrentRunIDs .Add (concurrentJob .RunID )
131- 						concurrentRun , err  :=  actions_model .GetRunByID (ctx , concurrentJob .RunID )
132- 						if  err  !=  nil  {
133- 							return  err 
134- 						}
135- 						if  concurrentRun .NeedApproval  {
136- 							continue 
137- 						}
138- 						if  js , ujs , err  :=  checkJobsOfRun (ctx , concurrentRun ); err  !=  nil  {
139- 							return  err 
140- 						} else  {
141- 							jobs  =  append (jobs , js ... )
142- 							updatedJobs  =  append (updatedJobs , ujs ... )
143- 						}
144- 						updatedJob , err  :=  actions_model .GetRunJobByID (ctx , concurrentJob .ID )
145- 						if  err  !=  nil  {
146- 							return  err 
147- 						}
148- 						if  updatedJob .Status  ==  actions_model .StatusWaiting  {
149- 							break 
150- 						}
151- 					}
152- 				}
153- 			}
67+ 		} else  {
68+ 			jobs  =  append (jobs , js ... )
69+ 			updatedJobs  =  append (updatedJobs , ujs ... )
15470		}
15571		return  nil 
15672	}); err  !=  nil  {
@@ -176,6 +92,79 @@ func checkJobsByRunID(ctx context.Context, runID int64) error {
17692	return  nil 
17793}
17894
95+ func  findBlockedRunByConcurrency (ctx  context.Context , repoID  int64 , concurrencyGroup  string ) (* actions_model.ActionRun , bool , error ) {
96+ 	if  concurrencyGroup  ==  ""  {
97+ 		return  nil , false , nil 
98+ 	}
99+ 	cRuns , cJobs , err  :=  actions_model .GetConcurrentRunsAndJobs (ctx , repoID , concurrencyGroup , []actions_model.Status {actions_model .StatusBlocked })
100+ 	if  err  !=  nil  {
101+ 		return  nil , false , fmt .Errorf ("find concurrent runs and jobs: %w" , err )
102+ 	}
103+ 
104+ 	// There can be at most one blocked run or job 
105+ 	var  concurrentRun  * actions_model.ActionRun 
106+ 	if  len (cRuns ) >  0  {
107+ 		concurrentRun  =  cRuns [0 ]
108+ 	} else  if  len (cJobs ) >  0  {
109+ 		jobRun , err  :=  actions_model .GetRunByID (ctx , cJobs [0 ].RunID )
110+ 		if  err  !=  nil  {
111+ 			return  nil , false , fmt .Errorf ("get run by job %d: %w" , cJobs [0 ].ID , err )
112+ 		}
113+ 		concurrentRun  =  jobRun 
114+ 	}
115+ 
116+ 	return  concurrentRun , concurrentRun  !=  nil , nil 
117+ }
118+ 
119+ func  checkRunConcurrency (ctx  context.Context , run  * actions_model.ActionRun ) (jobs , updatedJobs  []* actions_model.ActionRunJob , err  error ) {
120+ 	checkedConcurrencyGroup  :=  make (container.Set [string ])
121+ 
122+ 	// check run (workflow-level) concurrency 
123+ 	if  run .ConcurrencyGroup  !=  ""  {
124+ 		concurrentRun , found , err  :=  findBlockedRunByConcurrency (ctx , run .RepoID , run .ConcurrencyGroup )
125+ 		if  err  !=  nil  {
126+ 			return  nil , nil , fmt .Errorf ("find blocked run by concurrency: %w" , err )
127+ 		}
128+ 		if  found  &&  ! concurrentRun .NeedApproval  {
129+ 			if  js , ujs , err  :=  checkJobsOfRun (ctx , concurrentRun ); err  !=  nil  {
130+ 				return  nil , nil , err 
131+ 			} else  {
132+ 				jobs  =  append (jobs , js ... )
133+ 				updatedJobs  =  append (updatedJobs , ujs ... )
134+ 			}
135+ 		}
136+ 		checkedConcurrencyGroup .Add (run .ConcurrencyGroup )
137+ 	}
138+ 
139+ 	// check job concurrency 
140+ 	runJobs , err  :=  db .Find [actions_model.ActionRunJob ](ctx , actions_model.FindRunJobOptions {RunID : run .ID })
141+ 	if  err  !=  nil  {
142+ 		return  nil , nil , fmt .Errorf ("find run %d jobs: %w" , run .ID , err )
143+ 	}
144+ 	for  _ , job  :=  range  runJobs  {
145+ 		if  ! job .Status .IsDone () {
146+ 			continue 
147+ 		}
148+ 		if  job .ConcurrencyGroup  ==  ""  &&  checkedConcurrencyGroup .Contains (job .ConcurrencyGroup ) {
149+ 			continue 
150+ 		}
151+ 		concurrentRun , found , err  :=  findBlockedRunByConcurrency (ctx , job .RepoID , job .ConcurrencyGroup )
152+ 		if  err  !=  nil  {
153+ 			return  nil , nil , fmt .Errorf ("find blocked run by concurrency: %w" , err )
154+ 		}
155+ 		if  found  &&  ! concurrentRun .NeedApproval  {
156+ 			if  js , ujs , err  :=  checkJobsOfRun (ctx , concurrentRun ); err  !=  nil  {
157+ 				return  nil , nil , err 
158+ 			} else  {
159+ 				jobs  =  append (jobs , js ... )
160+ 				updatedJobs  =  append (updatedJobs , ujs ... )
161+ 			}
162+ 		}
163+ 		checkedConcurrencyGroup .Add (job .ConcurrencyGroup )
164+ 	}
165+ 	return 
166+ }
167+ 
179168func  checkJobsOfRun (ctx  context.Context , run  * actions_model.ActionRun ) (jobs , updatedJobs  []* actions_model.ActionRunJob , err  error ) {
180169	jobs , err  =  db .Find [actions_model.ActionRunJob ](ctx , actions_model.FindRunJobOptions {RunID : run .ID })
181170	if  err  !=  nil  {
0 commit comments