1+ use actix_web:: web:: Data ;
12use diffbot_lib:: {
23 github:: {
34 github_api:: CheckRun ,
@@ -12,15 +13,14 @@ use octocrab::models::InstallationId;
1213
1314use mysql_async:: { params, prelude:: Queryable } ;
1415
15- use crate :: DataJobSender ;
16+ use crate :: JobScheduler ;
1617
1718async fn handle_pull_request (
1819 payload : PullRequestEventPayload ,
19- job_sender : DataJobSender ,
20- pool : actix_web:: web:: Data < Option < mysql_async:: Pool > > ,
20+ scheduler : & JobScheduler ,
21+ sender : & flume:: Sender < ( String , u64 ) > ,
22+ pool : Option < & mysql_async:: Pool > ,
2123) -> Result < ( ) > {
22- let pool = pool. get_ref ( ) ;
23-
2424 match payload. action . as_str ( ) {
2525 "opened" | "synchronize" => {
2626 let check_run = CheckRun :: create (
@@ -37,9 +37,9 @@ async fn handle_pull_request(
3737 payload. pull_request . number ,
3838 ) ;
3939
40- let num_icons = handle_pull ( payload, job_sender , check_run) . await ?;
40+ let num_icons = handle_pull ( payload, scheduler , sender , check_run) . await ?;
4141
42- if let Some ( ref pool) = pool {
42+ if let Some ( pool) = pool {
4343 let mut conn = match pool. get_conn ( ) . await {
4444 Ok ( conn) => conn,
4545 Err ( e) => {
@@ -81,7 +81,9 @@ async fn handle_pull_request(
8181 Ok ( ( ) )
8282 }
8383 "closed" => {
84- if let Some ( ref pool) = pool {
84+ scheduler. remove ( & ( payload. repository . full_name ( ) , payload. pull_request . number ) ) ;
85+
86+ if let Some ( pool) = pool {
8587 let mut conn = match pool. get_conn ( ) . await {
8688 Ok ( conn) => conn,
8789 Err ( e) => {
@@ -106,7 +108,7 @@ async fn handle_pull_request(
106108 {
107109 tracing:: error!( "{:?}" , e) ;
108110 } ;
109- } ;
111+ }
110112 Ok ( ( ) )
111113 }
112114 _ => Ok ( ( ) ) ,
@@ -115,7 +117,8 @@ async fn handle_pull_request(
115117
116118async fn handle_pull (
117119 payload : PullRequestEventPayload ,
118- job_sender : DataJobSender ,
120+ scheduler : & JobScheduler ,
121+ sender : & flume:: Sender < ( String , u64 ) > ,
119122 check_run : CheckRun ,
120123) -> Result < usize > {
121124 if payload
@@ -188,6 +191,9 @@ async fn handle_pull(
188191
189192 check_run. mark_queued ( ) . await ?;
190193
194+ let scheduler_entry = ( payload. repository . full_name ( ) , payload. pull_request . number ) ;
195+ let scheduler_entry_clone = ( payload. repository . full_name ( ) , payload. pull_request . number ) ;
196+
191197 let pull = payload. pull_request ;
192198 let installation = payload. installation ;
193199
@@ -201,7 +207,20 @@ async fn handle_pull(
201207 installation : InstallationId ( installation. id ) ,
202208 } ;
203209
204- job_sender. send_async ( job) . await ?;
210+ if let Some ( old_job) = match scheduler. entry ( scheduler_entry) {
211+ dashmap:: Entry :: Occupied ( mut entry) => Some ( entry. insert ( job) ) ,
212+ dashmap:: Entry :: Vacant ( entry) => {
213+ entry. insert ( job) ;
214+ None
215+ }
216+ } {
217+ _ = old_job
218+ . check_run
219+ . mark_failed ( "Check cancelled, a later commit has triggered the check" )
220+ . await ;
221+ }
222+
223+ sender. send_async ( scheduler_entry_clone) . await ?;
205224
206225 Ok ( num_icons_diffed)
207226}
@@ -210,9 +229,15 @@ async fn handle_pull(
210229pub async fn process_github_payload_actix (
211230 event : diffbot_lib:: github:: github_api:: GithubEvent ,
212231 payload : String ,
213- job_sender : DataJobSender ,
214- pool : actix_web:: web:: Data < Option < mysql_async:: Pool > > ,
232+ pool : Data < Option < mysql_async:: Pool > > ,
233+ scheduler : Data < JobScheduler > ,
234+ sender : Data < flume:: Sender < ( String , u64 ) > > ,
215235) -> actix_web:: Result < & ' static str > {
236+ let ( pool, scheduler, sender) = (
237+ pool. get_ref ( ) . as_ref ( ) ,
238+ scheduler. get_ref ( ) ,
239+ sender. get_ref ( ) ,
240+ ) ;
216241 // TODO: Handle reruns
217242 if event. 0 != "pull_request" {
218243 return Ok ( "Not a pull request event" ) ;
@@ -231,7 +256,7 @@ pub async fn process_github_payload_actix(
231256
232257 let payload: PullRequestEventPayload = serde_json:: from_str ( & payload) ?;
233258
234- handle_pull_request ( payload, job_sender , pool)
259+ handle_pull_request ( payload, scheduler , sender , pool)
235260 . await
236261 . map_err ( actix_web:: error:: ErrorBadRequest ) ?;
237262
0 commit comments